You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by fr...@apache.org on 2020/06/17 03:29:51 UTC

[calcite-avatica-go] branch master updated: [CALCITE-4067] Add support for ExecuteBatchRequest in prepared statement (chenhualin)

This is an automated email from the ASF dual-hosted git repository.

francischuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite-avatica-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 017ef8b  [CALCITE-4067] Add support for ExecuteBatchRequest in prepared statement (chenhualin)
017ef8b is described below

commit 017ef8b95a4ce7489993e0fef14d20c596826f14
Author: geange <fa...@outlook.com>
AuthorDate: Tue Jun 16 10:48:42 2020 +0800

    [CALCITE-4067] Add support for ExecuteBatchRequest in prepared statement (chenhualin)
---
 class_mappings.go                 |   4 ++
 connection.go                     |  14 ++---
 driver_hsqldb_test.go             | 120 ++++++++++++++++++++++++++++++++++++++
 driver_phoenix_test.go            | 120 ++++++++++++++++++++++++++++++++++++++
 dsn.go                            |   8 +++
 site/_docs/go_client_reference.md |  29 +++++++++
 statement.go                      |  38 ++++++++++--
 7 files changed, 320 insertions(+), 13 deletions(-)

diff --git a/class_mappings.go b/class_mappings.go
index 086af31..90420f0 100644
--- a/class_mappings.go
+++ b/class_mappings.go
@@ -56,6 +56,8 @@ func classNameFromRequest(message interface{}) string {
 		class = "DatabasePropertyRequest"
 	case *avaticaMessage.ExecuteRequest:
 		class = "ExecuteRequest"
+	case *avaticaMessage.ExecuteBatchRequest:
+		class = "ExecuteBatchRequest"
 	case *avaticaMessage.FetchRequest:
 		class = "FetchRequest"
 	case *avaticaMessage.OpenConnectionRequest:
@@ -104,6 +106,8 @@ func responseFromClassName(className string) (proto.Message, error) {
 		return &avaticaMessage.ErrorResponse{}, nil
 	case "ExecuteResponse":
 		return &avaticaMessage.ExecuteResponse{}, nil
+	case "ExecuteBatchResponse":
+		return &avaticaMessage.ExecuteBatchResponse{}, nil
 	case "FetchResponse":
 		return &avaticaMessage.FetchResponse{}, nil
 	case "OpenConnectionResponse":
diff --git a/connection.go b/connection.go
index 4eba2ef..d7d1d5b 100644
--- a/connection.go
+++ b/connection.go
@@ -20,7 +20,6 @@ package avatica
 import (
 	"context"
 	"database/sql/driver"
-
 	"github.com/apache/calcite-avatica-go/v4/errors"
 	"github.com/apache/calcite-avatica-go/v4/message"
 	"golang.org/x/xerrors"
@@ -56,10 +55,11 @@ func (c *conn) prepare(ctx context.Context, query string) (driver.Stmt, error) {
 	prepareResponse := response.(*message.PrepareResponse)
 
 	return &stmt{
-		statementID: prepareResponse.Statement.Id,
-		conn:        c,
-		parameters:  prepareResponse.Statement.Signature.Parameters,
-		handle:      *prepareResponse.Statement,
+		statementID:  prepareResponse.Statement.Id,
+		conn:         c,
+		parameters:   prepareResponse.Statement.Signature.Parameters,
+		handle:       *prepareResponse.Statement,
+		batchUpdates: make([]*message.UpdateBatch, 0),
 	}, nil
 }
 
@@ -117,9 +117,7 @@ func (c *conn) begin(ctx context.Context, isolationLevel isoLevel) (driver.Tx, e
 		return nil, c.avaticaErrorToResponseErrorOrError(err)
 	}
 
-	return &tx{
-		conn: c,
-	}, nil
+	return &tx{conn: c}, nil
 }
 
 // Exec prepares and executes a query and returns the result directly.
diff --git a/driver_hsqldb_test.go b/driver_hsqldb_test.go
index 39143ed..334f24a 100644
--- a/driver_hsqldb_test.go
+++ b/driver_hsqldb_test.go
@@ -24,6 +24,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
+	"sync"
 	"testing"
 	"time"
 )
@@ -1255,6 +1256,125 @@ func TestHSQLDBMultipleSchemaSupport(t *testing.T) {
 	})
 }
 
+func TestHSQLDBExecBatch(t *testing.T) {
+	skipTestIfNotHSQLDB(t)
+
+	runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+		// Create and seed table
+		dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+				int INTEGER PRIMARY KEY
+			    )`)
+
+		stmt, err := dbt.db.Prepare(`INSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		totalRows := 6
+
+		for i := 1; i <= totalRows; i++ {
+			_, err := stmt.Exec(i)
+
+			if err != nil {
+				dbt.Fatal(err)
+			}
+		}
+
+		// When batching=true, after exec(sql), need to close the stmt
+		err = stmt.Close()
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		var res int
+
+		for i := 1; i <= totalRows; i++ {
+
+			err := queryStmt.QueryRow(i).Scan(&res)
+
+			if err != nil {
+				dbt.Fatal(err)
+			}
+
+			if res != i {
+				dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+			}
+		}
+	})
+}
+
+func TestHSQLDBExecBatchConcurrency(t *testing.T) {
+	skipTestIfNotHSQLDB(t)
+
+	runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+		// Create and seed table
+		dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+				int INTEGER PRIMARY KEY
+			    )`)
+
+		stmt, err := dbt.db.Prepare(`INSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		totalRows := 6
+
+		var wg sync.WaitGroup
+		for i := 1; i <= totalRows; i++ {
+			wg.Add(1)
+			go func(num int) {
+				defer wg.Done()
+
+				_, err := stmt.Exec(num)
+
+				if err != nil {
+					dbt.Fatal(err)
+				}
+			}(i)
+		}
+		wg.Wait()
+
+		// When batching=true, after exec(sql), need to close the stmt
+		err = stmt.Close()
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		var res int
+
+		for i := 1; i <= totalRows; i++ {
+
+			err := queryStmt.QueryRow(i).Scan(&res)
+
+			if err != nil {
+				dbt.Fatal(err)
+			}
+
+			if res != i {
+				dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+			}
+		}
+	})
+}
+
 // TODO: Test disabled due to CALCITE-1049
 /*func TestHSQLDBErrorCodeParsing(t *testing.T) {
 
diff --git a/driver_phoenix_test.go b/driver_phoenix_test.go
index 9b09d0b..c0df956 100644
--- a/driver_phoenix_test.go
+++ b/driver_phoenix_test.go
@@ -25,6 +25,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
+	"sync"
 	"testing"
 	"time"
 
@@ -1378,3 +1379,122 @@ func TestPhoenixErrorCodeParsing(t *testing.T) {
 		t.Errorf("Expected SQL state to be %s, got %s.", "42M03", resErr.SqlState)
 	}
 }
+
+func TestPhoenixExecBatch(t *testing.T) {
+	skipTestIfNotPhoenix(t)
+
+	runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+		// Create and seed table
+		dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+				int INTEGER PRIMARY KEY
+			    ) TRANSACTIONAL=false`)
+
+		stmt, err := dbt.db.Prepare(`UPSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		totalRows := 6
+
+		for i := 1; i <= totalRows; i++ {
+			_, err := stmt.Exec(i)
+
+			if err != nil {
+				dbt.Fatal(err)
+			}
+		}
+
+		// When batching=true, after exec(sql), need to close the stmt
+		err = stmt.Close()
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		var res int
+
+		for i := 1; i <= totalRows; i++ {
+
+			err := queryStmt.QueryRow(i).Scan(&res)
+
+			if err != nil {
+				dbt.Fatal(err)
+			}
+
+			if res != i {
+				dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+			}
+		}
+	})
+}
+
+func TestPhoenixExecBatchConcurrency(t *testing.T) {
+	skipTestIfNotPhoenix(t)
+
+	runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+		// Create and seed table
+		dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+				int INTEGER PRIMARY KEY
+			    ) TRANSACTIONAL=false`)
+
+		stmt, err := dbt.db.Prepare(`UPSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		totalRows := 6
+
+		var wg sync.WaitGroup
+		for i := 1; i <= totalRows; i++ {
+			wg.Add(1)
+			go func(num int) {
+				defer wg.Done()
+
+				_, err := stmt.Exec(num)
+
+				if err != nil {
+					dbt.Fatal(err)
+				}
+			}(i)
+		}
+		wg.Wait()
+
+		// When batching=true, after exec(sql), need to close the stmt
+		err = stmt.Close()
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+		if err != nil {
+			dbt.Fatal(err)
+		}
+
+		var res int
+
+		for i := 1; i <= totalRows; i++ {
+
+			err := queryStmt.QueryRow(i).Scan(&res)
+
+			if err != nil {
+				dbt.Fatal(err)
+			}
+
+			if res != i {
+				dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+			}
+		}
+	})
+}
diff --git a/dsn.go b/dsn.go
index a7a7ce1..4d54684 100644
--- a/dsn.go
+++ b/dsn.go
@@ -43,6 +43,7 @@ type Config struct {
 	location             *time.Location
 	schema               string
 	transactionIsolation uint32
+	batching             bool
 
 	authentication      authentication
 	avaticaUser         string
@@ -66,6 +67,7 @@ func ParseDSN(dsn string) (*Config, error) {
 		frameMaxSize:         -1,
 		location:             time.UTC,
 		transactionIsolation: 0,
+		batching:             false,
 	}
 
 	parsed, err := url.ParseRequestURI(dsn)
@@ -124,6 +126,12 @@ func ParseDSN(dsn string) (*Config, error) {
 		conf.transactionIsolation = uint32(isolation)
 	}
 
+	if v := queries.Get("batching"); v != "" {
+		if v == "true" {
+			conf.batching = true
+		}
+	}
+
 	if v := queries.Get("authentication"); v != "" {
 
 		auth := strings.ToUpper(v)
diff --git a/site/_docs/go_client_reference.md b/site/_docs/go_client_reference.md
index 2782294..84cbcd2 100644
--- a/site/_docs/go_client_reference.md
+++ b/site/_docs/go_client_reference.md
@@ -162,6 +162,35 @@ The supported values for `transactionIsolation` are:
 | 4     | `TRANSACTION_REPEATABLE_READ`  | Dirty reads and non-repeatable reads are prevented, but phantom reads may occur. |
 | 8     | `TRANSACTION_SERIALIZABLE`     | Dirty reads, non-repeatable reads, and phantom reads are all prevented.          |
 
+<strong><a name="batching" href="#batching">batching</a></strong>
+
+When you want to write large amounts of data more quickly, instead of consuming time on network communications.
+By using [ExecuteBatchRequest](https://calcite.apache.org/avatica/docs/protobuf_reference.html#executebatchrequest), 
+you can pack and send multiple pieces of data to reduce the confirmation of messages. When you set `batching=true`, 
+Statement will only be executed when `Close()` is called, and the statement is goroutine-safe.
+
+```go
+// when using phoenix
+stmt, _ := db.Prepare(`UPSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+var wg sync.WaitGroup
+for i := 1; i <= 6; i++ {
+    wg.Add(1)
+    go func(num int) {
+        defer wg.Done()
+
+        _, err := stmt.Exec(num)
+
+        if err != nil {
+            dbt.Fatal(err)
+        }
+    }(i)
+}
+wg.Wait()
+
+// When batching=true, Statement will only be executed when Close() is called
+err = stmt.Close()
+```
+
 ## time.Time support
 
 The following datatypes are automatically converted to and from `time.Time`:
diff --git a/statement.go b/statement.go
index e7d3bc4..e72b7cf 100644
--- a/statement.go
+++ b/statement.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"database/sql/driver"
 	"math"
+	"sync"
 	"time"
 
 	"github.com/apache/calcite-avatica-go/v4/message"
@@ -28,10 +29,12 @@ import (
 )
 
 type stmt struct {
-	statementID uint32
-	conn        *conn
-	parameters  []*message.AvaticaParameter
-	handle      message.StatementHandle
+	statementID  uint32
+	conn         *conn
+	parameters   []*message.AvaticaParameter
+	handle       message.StatementHandle
+	batchUpdates []*message.UpdateBatch
+	sync.Mutex
 }
 
 // Close closes a statement
@@ -41,6 +44,17 @@ func (s *stmt) Close() error {
 		return driver.ErrBadConn
 	}
 
+	if s.conn.config.batching {
+		_, err := s.conn.httpClient.post(context.Background(), &message.ExecuteBatchRequest{
+			ConnectionId: s.conn.connectionId,
+			StatementId:  s.statementID,
+			Updates:      s.batchUpdates,
+		})
+		if err != nil {
+			return s.conn.avaticaErrorToResponseErrorOrError(err)
+		}
+	}
+
 	_, err := s.conn.httpClient.post(context.Background(), &message.CloseStatementRequest{
 		ConnectionId: s.conn.connectionId,
 		StatementId:  s.statementID,
@@ -79,9 +93,23 @@ func (s *stmt) exec(ctx context.Context, args []namedValue) (driver.Result, erro
 		return nil, driver.ErrBadConn
 	}
 
+	values := s.parametersToTypedValues(args)
+
+	if s.conn.config.batching {
+		s.Lock()
+		defer s.Unlock()
+
+		s.batchUpdates = append(s.batchUpdates, &message.UpdateBatch{
+			ParameterValues: values,
+		})
+		return &result{
+			affectedRows: -1,
+		}, nil
+	}
+
 	msg := &message.ExecuteRequest{
 		StatementHandle:    &s.handle,
-		ParameterValues:    s.parametersToTypedValues(args),
+		ParameterValues:    values,
 		FirstFrameMaxSize:  s.conn.config.frameMaxSize,
 		HasParameterValues: true,
 	}