You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2017/08/11 01:50:19 UTC

[22/50] calcite-avatica-go git commit: Implement context interfaces, ping and transaction isolation levels

Implement context interfaces, ping and transaction isolation levels


Project: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/commit/261f94f7
Tree: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/tree/261f94f7
Diff: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/diff/261f94f7

Branch: refs/heads/master
Commit: 261f94f7fe833489b94e0d8ae34259adb0d20091
Parents: ee185cb
Author: Francis Chuang <fr...@boostport.com>
Authored: Tue Mar 7 10:26:06 2017 +1100
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Aug 10 18:47:10 2017 -0700

----------------------------------------------------------------------
 compat_go18.go     | 50 +++++++++++++++++++++++++++++++
 connection.go      | 35 +++++++++++++++++-----
 connection_go18.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++++
 statement.go       | 22 ++++++++++----
 statement_go18.go  | 32 ++++++++++++++++++++
 5 files changed, 205 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/compat_go18.go
----------------------------------------------------------------------
diff --git a/compat_go18.go b/compat_go18.go
new file mode 100644
index 0000000..8efd1b5
--- /dev/null
+++ b/compat_go18.go
@@ -0,0 +1,50 @@
+package avatica
+
+import (
+	"database/sql/driver"
+	"fmt"
+)
+
+type namedValue struct {
+	Name    string
+	Ordinal int
+	Value   driver.Value
+}
+
+func driverValueToNamedValue(values []driver.Value) []namedValue {
+	list := make([]namedValue, len(values))
+
+	for i, v := range values {
+		list[i] = namedValue{
+			Ordinal: i + 1,
+			Value:   v,
+		}
+	}
+
+	return list
+}
+
+func driverNamedValueToNamedValue(values []driver.NamedValue) ([]namedValue,error ) {
+	list := make([]namedValue, len(values))
+
+	for i, nv := range values {
+		list[i] = namedValue(nv)
+
+		if nv.Name != ""{
+			return list,fmt.Errorf("named paramters are not supported: %s given", nv.Name)
+		}
+	}
+
+	return list, nil
+}
+
+type isoLevel int32
+
+const (
+	isolationUseCurrent      isoLevel = -1
+	isolationNone            isoLevel = 0
+	isolationReadUncommitted isoLevel = 1
+	isolationReadComitted    isoLevel = 2
+	isolationRepeatableRead  isoLevel = 4
+	isolationSerializable    isoLevel = 8
+)

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/connection.go
----------------------------------------------------------------------
diff --git a/connection.go b/connection.go
index 1a3c9df..2149261 100644
--- a/connection.go
+++ b/connection.go
@@ -2,6 +2,7 @@ package avatica
 
 import (
 	"database/sql/driver"
+
 	"github.com/Boostport/avatica/message"
 	"golang.org/x/net/context"
 )
@@ -14,12 +15,15 @@ type conn struct {
 
 // Prepare returns a prepared statement, bound to this connection.
 func (c *conn) Prepare(query string) (driver.Stmt, error) {
+	return c.prepare(context.Background(), query)
+}
 
+func (c *conn) prepare(ctx context.Context, query string) (driver.Stmt, error) {
 	if c.connectionId == "" {
 		return nil, driver.ErrBadConn
 	}
 
-	response, err := c.httpClient.post(context.Background(), &message.PrepareRequest{
+	response, err := c.httpClient.post(ctx, &message.PrepareRequest{
 		ConnectionId: c.connectionId,
 		Sql:          query,
 		MaxRowsTotal: c.config.maxRowsTotal,
@@ -65,16 +69,24 @@ func (c *conn) Close() error {
 // Begin starts and returns a new transaction.
 func (c *conn) Begin() (driver.Tx, error) {
 
+	return c.begin(context.Background(), isolationUseCurrent)
+}
+
+func (c *conn) begin(ctx context.Context, isolationLevel isoLevel) (driver.Tx, error) {
 	if c.connectionId == "" {
 		return nil, driver.ErrBadConn
 	}
 
-	_, err := c.httpClient.post(context.Background(), &message.ConnectionSyncRequest{
+	if isolationLevel == isolationUseCurrent {
+		isolationLevel = isoLevel(c.config.transactionIsolation)
+	}
+
+	_, err := c.httpClient.post(ctx, &message.ConnectionSyncRequest{
 		ConnectionId: c.connectionId,
 		ConnProps: &message.ConnectionProperties{
 			AutoCommit:           false,
 			HasAutoCommit:        true,
-			TransactionIsolation: c.config.transactionIsolation,
+			TransactionIsolation: uint32(isolationLevel),
 		},
 	})
 
@@ -89,7 +101,11 @@ func (c *conn) Begin() (driver.Tx, error) {
 
 // Exec prepares and executes a query and returns the result directly.
 func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
+	list := driverValueToNamedValue(args)
+	return c.exec(context.Background(), query, list)
+}
 
+func (c *conn) exec(ctx context.Context, query string, args []namedValue) (driver.Result, error) {
 	if c.connectionId == "" {
 		return nil, driver.ErrBadConn
 	}
@@ -98,7 +114,7 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
 		return nil, driver.ErrSkip
 	}
 
-	st, err := c.httpClient.post(context.Background(), &message.CreateStatementRequest{
+	st, err := c.httpClient.post(ctx, &message.CreateStatementRequest{
 		ConnectionId: c.connectionId,
 	})
 
@@ -106,7 +122,7 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
 		return nil, err
 	}
 
-	res, err := c.httpClient.post(context.Background(), &message.PrepareAndExecuteRequest{
+	res, err := c.httpClient.post(ctx, &message.PrepareAndExecuteRequest{
 		ConnectionId:      c.connectionId,
 		StatementId:       st.(*message.CreateStatementResponse).StatementId,
 		Sql:               query,
@@ -124,13 +140,16 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
 	return &result{
 		affectedRows: changed,
 	}, nil
-
 }
 
 // Query prepares and executes a query and returns the result directly.
 // Query's optimizations are currently disabled due to CALCITE-1181.
 func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
+	list := driverValueToNamedValue(args)
+	return c.query(context.Background(), query, list)
+}
 
+func (c *conn) query(ctx context.Context, query string, args []namedValue) (driver.Rows, error) {
 	if c.connectionId == "" {
 		return nil, driver.ErrBadConn
 	}
@@ -139,7 +158,7 @@ func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
 		return nil, driver.ErrSkip
 	}
 
-	st, err := c.httpClient.post(context.Background(), &message.CreateStatementRequest{
+	st, err := c.httpClient.post(ctx, &message.CreateStatementRequest{
 		ConnectionId: c.connectionId,
 	})
 
@@ -147,7 +166,7 @@ func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
 		return nil, err
 	}
 
-	res, err := c.httpClient.post(context.Background(), &message.PrepareAndExecuteRequest{
+	res, err := c.httpClient.post(ctx, &message.PrepareAndExecuteRequest{
 		ConnectionId:      c.connectionId,
 		StatementId:       st.(*message.CreateStatementResponse).StatementId,
 		Sql:               query,

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/connection_go18.go
----------------------------------------------------------------------
diff --git a/connection_go18.go b/connection_go18.go
new file mode 100644
index 0000000..a54a115
--- /dev/null
+++ b/connection_go18.go
@@ -0,0 +1,80 @@
+// +build go1.8
+package avatica
+
+import (
+	"database/sql/driver"
+
+	"errors"
+
+	"database/sql"
+	"fmt"
+
+	"golang.org/x/net/context"
+)
+
+func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
+
+	if opts.ReadOnly {
+		return nil, errors.New("Read-only transactions are not supported")
+	}
+
+	var isolation isoLevel
+
+	switch sql.IsolationLevel(opts.Isolation) {
+	case sql.LevelDefault:
+		isolation = isolationUseCurrent
+	case sql.LevelReadUncommitted:
+		isolation = isolationReadUncommitted
+	case sql.LevelReadCommitted:
+		isolation = isolationReadComitted
+	case sql.LevelWriteCommitted:
+		return nil, errors.New("LevelWriteCommitted isolation level is not supported")
+	case sql.LevelRepeatableRead:
+		isolation = isolationRepeatableRead
+	case sql.LevelSnapshot:
+		return nil, errors.New("LevelSnapshot isolation level is not supported")
+	case sql.LevelSerializable:
+		isolation = isolationSerializable
+	case sql.LevelLinearizable:
+		return nil, errors.New("LevelLinearizable isolation level is not supported")
+	default:
+		return nil, fmt.Errorf("Unsupported transaction isolation level: %d", opts.Isolation)
+	}
+
+	return c.begin(ctx, isolation)
+}
+
+func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
+	return c.prepare(ctx, query)
+}
+
+func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
+	list, err := driverNamedValueToNamedValue(args)
+
+	if err != nil {
+		return nil, fmt.Errorf("could not execute statement: %s", err)
+	}
+
+	return c.exec(ctx, query, list)
+}
+
+func (c *conn) Ping(ctx context.Context) error {
+
+	_, err := c.ExecContext(ctx, "SELECT 1;", []driver.NamedValue{})
+
+	if err != nil {
+		return fmt.Errorf("Error pinging database: %s", err)
+	}
+
+	return nil
+}
+
+func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
+	list, err := driverNamedValueToNamedValue(args)
+
+	if err != nil {
+		return nil, fmt.Errorf("could not execute query: %s", err)
+	}
+
+	return c.query(ctx, query, list)
+}

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/statement.go
----------------------------------------------------------------------
diff --git a/statement.go b/statement.go
index 47d487f..1861a93 100644
--- a/statement.go
+++ b/statement.go
@@ -2,9 +2,10 @@ package avatica
 
 import (
 	"database/sql/driver"
+	"time"
+
 	"github.com/Boostport/avatica/message"
 	"golang.org/x/net/context"
-	"time"
 )
 
 type stmt struct {
@@ -45,12 +46,17 @@ func (s *stmt) NumInput() int {
 // Exec executes a query that doesn't return rows, such
 // as an INSERT or UPDATE.
 func (s *stmt) Exec(args []driver.Value) (driver.Result, error) {
+	list := driverValueToNamedValue(args)
+	return s.exec(context.Background(), list)
+}
+
+func (s *stmt) exec(ctx context.Context, args []namedValue) (driver.Result, error) {
 
 	if s.conn.connectionId == "" {
 		return nil, driver.ErrBadConn
 	}
 
-	res, err := s.conn.httpClient.post(context.Background(), &message.ExecuteRequest{
+	res, err := s.conn.httpClient.post(ctx, &message.ExecuteRequest{
 		StatementHandle:    &s.handle,
 		ParameterValues:    s.parametersToTypedValues(args),
 		FirstFrameMaxSize:  uint64(s.conn.config.frameMaxSize), //TODO: Due to CALCITE-1353, if frameMaxSize == -1, it overflows to 18446744073709551615 due to the conversion to uint64, which is basically all rows.
@@ -72,12 +78,16 @@ func (s *stmt) Exec(args []driver.Value) (driver.Result, error) {
 // Query executes a query that may return rows, such as a
 // SELECT.
 func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
+	list := driverValueToNamedValue(args)
+	return s.query(context.Background(), list)
+}
 
+func (s *stmt) query(ctx context.Context, args []namedValue) (driver.Rows, error) {
 	if s.conn.connectionId == "" {
 		return nil, driver.ErrBadConn
 	}
 
-	res, err := s.conn.httpClient.post(context.Background(), &message.ExecuteRequest{
+	res, err := s.conn.httpClient.post(ctx, &message.ExecuteRequest{
 		StatementHandle:    &s.handle,
 		ParameterValues:    s.parametersToTypedValues(args),
 		FirstFrameMaxSize:  uint64(s.conn.config.frameMaxSize), //TODO: Due to CALCITE-1353, if frameMaxSize == -1, it overflows to 18446744073709551615 due to the conversion to uint64, which is basically all rows.
@@ -94,18 +104,18 @@ func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
 	return newRows(s.conn, s.statementID, resultSet), nil
 }
 
-func (s *stmt) parametersToTypedValues(vals []driver.Value) []*message.TypedValue {
+func (s *stmt) parametersToTypedValues(vals []namedValue) []*message.TypedValue {
 
 	result := []*message.TypedValue{}
 
 	for i, val := range vals {
 		typed := message.TypedValue{}
 
-		if val == nil {
+		if val.Value == nil {
 			typed.Null = true
 		} else {
 
-			switch v := val.(type) {
+			switch v := val.Value.(type) {
 			case int64:
 				typed.Type = message.Rep_LONG
 				typed.NumberValue = v

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/statement_go18.go
----------------------------------------------------------------------
diff --git a/statement_go18.go b/statement_go18.go
new file mode 100644
index 0000000..e7ee65e
--- /dev/null
+++ b/statement_go18.go
@@ -0,0 +1,32 @@
+// +build go1.8
+package avatica
+
+import (
+	"database/sql/driver"
+
+	"fmt"
+
+	"golang.org/x/net/context"
+)
+
+func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
+
+	list, err := driverNamedValueToNamedValue(args)
+
+	if err != nil {
+		return nil, fmt.Errorf("Error executing statement: %s", err)
+	}
+
+	return s.exec(ctx, list)
+}
+
+func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
+
+	list, err := driverNamedValueToNamedValue(args)
+
+	if err != nil {
+		return nil, fmt.Errorf("Error executing query: %s", err)
+	}
+
+	return s.query(ctx, list)
+}