You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by fr...@apache.org on 2020/06/17 03:29:51 UTC
[calcite-avatica-go] branch master updated: [CALCITE-4067] Add
support for ExecuteBatchRequest in prepared statement (chenhualin)
This is an automated email from the ASF dual-hosted git repository.
francischuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite-avatica-go.git
The following commit(s) were added to refs/heads/master by this push:
new 017ef8b [CALCITE-4067] Add support for ExecuteBatchRequest in prepared statement (chenhualin)
017ef8b is described below
commit 017ef8b95a4ce7489993e0fef14d20c596826f14
Author: geange <fa...@outlook.com>
AuthorDate: Tue Jun 16 10:48:42 2020 +0800
[CALCITE-4067] Add support for ExecuteBatchRequest in prepared statement (chenhualin)
---
class_mappings.go | 4 ++
connection.go | 14 ++---
driver_hsqldb_test.go | 120 ++++++++++++++++++++++++++++++++++++++
driver_phoenix_test.go | 120 ++++++++++++++++++++++++++++++++++++++
dsn.go | 8 +++
site/_docs/go_client_reference.md | 29 +++++++++
statement.go | 38 ++++++++++--
7 files changed, 320 insertions(+), 13 deletions(-)
diff --git a/class_mappings.go b/class_mappings.go
index 086af31..90420f0 100644
--- a/class_mappings.go
+++ b/class_mappings.go
@@ -56,6 +56,8 @@ func classNameFromRequest(message interface{}) string {
class = "DatabasePropertyRequest"
case *avaticaMessage.ExecuteRequest:
class = "ExecuteRequest"
+ case *avaticaMessage.ExecuteBatchRequest:
+ class = "ExecuteBatchRequest"
case *avaticaMessage.FetchRequest:
class = "FetchRequest"
case *avaticaMessage.OpenConnectionRequest:
@@ -104,6 +106,8 @@ func responseFromClassName(className string) (proto.Message, error) {
return &avaticaMessage.ErrorResponse{}, nil
case "ExecuteResponse":
return &avaticaMessage.ExecuteResponse{}, nil
+ case "ExecuteBatchResponse":
+ return &avaticaMessage.ExecuteBatchResponse{}, nil
case "FetchResponse":
return &avaticaMessage.FetchResponse{}, nil
case "OpenConnectionResponse":
diff --git a/connection.go b/connection.go
index 4eba2ef..d7d1d5b 100644
--- a/connection.go
+++ b/connection.go
@@ -20,7 +20,6 @@ package avatica
import (
"context"
"database/sql/driver"
-
"github.com/apache/calcite-avatica-go/v4/errors"
"github.com/apache/calcite-avatica-go/v4/message"
"golang.org/x/xerrors"
@@ -56,10 +55,11 @@ func (c *conn) prepare(ctx context.Context, query string) (driver.Stmt, error) {
prepareResponse := response.(*message.PrepareResponse)
return &stmt{
- statementID: prepareResponse.Statement.Id,
- conn: c,
- parameters: prepareResponse.Statement.Signature.Parameters,
- handle: *prepareResponse.Statement,
+ statementID: prepareResponse.Statement.Id,
+ conn: c,
+ parameters: prepareResponse.Statement.Signature.Parameters,
+ handle: *prepareResponse.Statement,
+ batchUpdates: make([]*message.UpdateBatch, 0),
}, nil
}
@@ -117,9 +117,7 @@ func (c *conn) begin(ctx context.Context, isolationLevel isoLevel) (driver.Tx, e
return nil, c.avaticaErrorToResponseErrorOrError(err)
}
- return &tx{
- conn: c,
- }, nil
+ return &tx{conn: c}, nil
}
// Exec prepares and executes a query and returns the result directly.
diff --git a/driver_hsqldb_test.go b/driver_hsqldb_test.go
index 39143ed..334f24a 100644
--- a/driver_hsqldb_test.go
+++ b/driver_hsqldb_test.go
@@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
+ "sync"
"testing"
"time"
)
@@ -1255,6 +1256,125 @@ func TestHSQLDBMultipleSchemaSupport(t *testing.T) {
})
}
+func TestHSQLDBExecBatch(t *testing.T) {
+ skipTestIfNotHSQLDB(t)
+
+ runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+ // Create and seed table
+ dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+ int INTEGER PRIMARY KEY
+ )`)
+
+ stmt, err := dbt.db.Prepare(`INSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ totalRows := 6
+
+ for i := 1; i <= totalRows; i++ {
+ _, err := stmt.Exec(i)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+ }
+
+ // When batching=true, after exec(sql), need to close the stmt
+ err = stmt.Close()
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ var res int
+
+ for i := 1; i <= totalRows; i++ {
+
+ err := queryStmt.QueryRow(i).Scan(&res)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ if res != i {
+ dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+ }
+ }
+ })
+}
+
+func TestHSQLDBExecBatchConcurrency(t *testing.T) {
+ skipTestIfNotHSQLDB(t)
+
+ runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+ // Create and seed table
+ dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+ int INTEGER PRIMARY KEY
+ )`)
+
+ stmt, err := dbt.db.Prepare(`INSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ totalRows := 6
+
+ var wg sync.WaitGroup
+ for i := 1; i <= totalRows; i++ {
+ wg.Add(1)
+ go func(num int) {
+ defer wg.Done()
+
+ _, err := stmt.Exec(num)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+ }(i)
+ }
+ wg.Wait()
+
+ // When batching=true, after exec(sql), need to close the stmt
+ err = stmt.Close()
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ var res int
+
+ for i := 1; i <= totalRows; i++ {
+
+ err := queryStmt.QueryRow(i).Scan(&res)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ if res != i {
+ dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+ }
+ }
+ })
+}
+
// TODO: Test disabled due to CALCITE-1049
/*func TestHSQLDBErrorCodeParsing(t *testing.T) {
diff --git a/driver_phoenix_test.go b/driver_phoenix_test.go
index 9b09d0b..c0df956 100644
--- a/driver_phoenix_test.go
+++ b/driver_phoenix_test.go
@@ -25,6 +25,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
+ "sync"
"testing"
"time"
@@ -1378,3 +1379,122 @@ func TestPhoenixErrorCodeParsing(t *testing.T) {
t.Errorf("Expected SQL state to be %s, got %s.", "42M03", resErr.SqlState)
}
}
+
+func TestPhoenixExecBatch(t *testing.T) {
+ skipTestIfNotPhoenix(t)
+
+ runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+ // Create and seed table
+ dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+ int INTEGER PRIMARY KEY
+ ) TRANSACTIONAL=false`)
+
+ stmt, err := dbt.db.Prepare(`UPSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ totalRows := 6
+
+ for i := 1; i <= totalRows; i++ {
+ _, err := stmt.Exec(i)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+ }
+
+ // When batching=true, after exec(sql), need to close the stmt
+ err = stmt.Close()
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ var res int
+
+ for i := 1; i <= totalRows; i++ {
+
+ err := queryStmt.QueryRow(i).Scan(&res)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ if res != i {
+ dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+ }
+ }
+ })
+}
+
+func TestPhoenixExecBatchConcurrency(t *testing.T) {
+ skipTestIfNotPhoenix(t)
+
+ runTests(t, dsn+"?batching=true", func(dbt *DBTest) {
+
+ // Create and seed table
+ dbt.mustExec(`CREATE TABLE ` + dbt.tableName + ` (
+ int INTEGER PRIMARY KEY
+ ) TRANSACTIONAL=false`)
+
+ stmt, err := dbt.db.Prepare(`UPSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ totalRows := 6
+
+ var wg sync.WaitGroup
+ for i := 1; i <= totalRows; i++ {
+ wg.Add(1)
+ go func(num int) {
+ defer wg.Done()
+
+ _, err := stmt.Exec(num)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+ }(i)
+ }
+ wg.Wait()
+
+ // When batching=true, after exec(sql), need to close the stmt
+ err = stmt.Close()
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ queryStmt, err := dbt.db.Prepare(`SELECT * FROM ` + dbt.tableName + ` WHERE int = ?`)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ var res int
+
+ for i := 1; i <= totalRows; i++ {
+
+ err := queryStmt.QueryRow(i).Scan(&res)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+
+ if res != i {
+ dbt.Fatalf("Unexpected query result. Expected %d, got %d.", i, res)
+ }
+ }
+ })
+}
diff --git a/dsn.go b/dsn.go
index a7a7ce1..4d54684 100644
--- a/dsn.go
+++ b/dsn.go
@@ -43,6 +43,7 @@ type Config struct {
location *time.Location
schema string
transactionIsolation uint32
+ batching bool
authentication authentication
avaticaUser string
@@ -66,6 +67,7 @@ func ParseDSN(dsn string) (*Config, error) {
frameMaxSize: -1,
location: time.UTC,
transactionIsolation: 0,
+ batching: false,
}
parsed, err := url.ParseRequestURI(dsn)
@@ -124,6 +126,12 @@ func ParseDSN(dsn string) (*Config, error) {
conf.transactionIsolation = uint32(isolation)
}
+ if v := queries.Get("batching"); v != "" {
+ if v == "true" {
+ conf.batching = true
+ }
+ }
+
if v := queries.Get("authentication"); v != "" {
auth := strings.ToUpper(v)
diff --git a/site/_docs/go_client_reference.md b/site/_docs/go_client_reference.md
index 2782294..84cbcd2 100644
--- a/site/_docs/go_client_reference.md
+++ b/site/_docs/go_client_reference.md
@@ -162,6 +162,35 @@ The supported values for `transactionIsolation` are:
| 4 | `TRANSACTION_REPEATABLE_READ` | Dirty reads and non-repeatable reads are prevented, but phantom reads may occur. |
| 8 | `TRANSACTION_SERIALIZABLE` | Dirty reads, non-repeatable reads, and phantom reads are all prevented. |
+<strong><a name="batching" href="#batching">batching</a></strong>
+
+When you want to write large amounts of data more quickly, instead of consuming time on network communications.
+By using [ExecuteBatchRequest](https://calcite.apache.org/avatica/docs/protobuf_reference.html#executebatchrequest),
+you can pack and send multiple pieces of data to reduce the confirmation of messages. When you set `batching=true`,
+Statement will only be executed when `Close()` is called, and the statement is goroutine-safe.
+
+```go
+// when using phoenix
+stmt, _ := db.Prepare(`UPSERT INTO ` + dbt.tableName + ` VALUES(?)`)
+var wg sync.WaitGroup
+for i := 1; i <= 6; i++ {
+ wg.Add(1)
+ go func(num int) {
+ defer wg.Done()
+
+ _, err := stmt.Exec(num)
+
+ if err != nil {
+ dbt.Fatal(err)
+ }
+ }(i)
+}
+wg.Wait()
+
+// When batching=true, Statement will only be executed when Close() is called
+err = stmt.Close()
+```
+
## time.Time support
The following datatypes are automatically converted to and from `time.Time`:
diff --git a/statement.go b/statement.go
index e7d3bc4..e72b7cf 100644
--- a/statement.go
+++ b/statement.go
@@ -21,6 +21,7 @@ import (
"context"
"database/sql/driver"
"math"
+ "sync"
"time"
"github.com/apache/calcite-avatica-go/v4/message"
@@ -28,10 +29,12 @@ import (
)
type stmt struct {
- statementID uint32
- conn *conn
- parameters []*message.AvaticaParameter
- handle message.StatementHandle
+ statementID uint32
+ conn *conn
+ parameters []*message.AvaticaParameter
+ handle message.StatementHandle
+ batchUpdates []*message.UpdateBatch
+ sync.Mutex
}
// Close closes a statement
@@ -41,6 +44,17 @@ func (s *stmt) Close() error {
return driver.ErrBadConn
}
+ if s.conn.config.batching {
+ _, err := s.conn.httpClient.post(context.Background(), &message.ExecuteBatchRequest{
+ ConnectionId: s.conn.connectionId,
+ StatementId: s.statementID,
+ Updates: s.batchUpdates,
+ })
+ if err != nil {
+ return s.conn.avaticaErrorToResponseErrorOrError(err)
+ }
+ }
+
_, err := s.conn.httpClient.post(context.Background(), &message.CloseStatementRequest{
ConnectionId: s.conn.connectionId,
StatementId: s.statementID,
@@ -79,9 +93,23 @@ func (s *stmt) exec(ctx context.Context, args []namedValue) (driver.Result, erro
return nil, driver.ErrBadConn
}
+ values := s.parametersToTypedValues(args)
+
+ if s.conn.config.batching {
+ s.Lock()
+ defer s.Unlock()
+
+ s.batchUpdates = append(s.batchUpdates, &message.UpdateBatch{
+ ParameterValues: values,
+ })
+ return &result{
+ affectedRows: -1,
+ }, nil
+ }
+
msg := &message.ExecuteRequest{
StatementHandle: &s.handle,
- ParameterValues: s.parametersToTypedValues(args),
+ ParameterValues: values,
FirstFrameMaxSize: s.conn.config.frameMaxSize,
HasParameterValues: true,
}