You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2017/08/11 01:50:19 UTC
[22/50] calcite-avatica-go git commit: Implement context interfaces,
ping and transaction isolation levels
Implement context interfaces, ping and transaction isolation levels
Project: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/commit/261f94f7
Tree: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/tree/261f94f7
Diff: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/diff/261f94f7
Branch: refs/heads/master
Commit: 261f94f7fe833489b94e0d8ae34259adb0d20091
Parents: ee185cb
Author: Francis Chuang <fr...@boostport.com>
Authored: Tue Mar 7 10:26:06 2017 +1100
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Aug 10 18:47:10 2017 -0700
----------------------------------------------------------------------
compat_go18.go | 50 +++++++++++++++++++++++++++++++
connection.go | 35 +++++++++++++++++-----
connection_go18.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++++
statement.go | 22 ++++++++++----
statement_go18.go | 32 ++++++++++++++++++++
5 files changed, 205 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/compat_go18.go
----------------------------------------------------------------------
diff --git a/compat_go18.go b/compat_go18.go
new file mode 100644
index 0000000..8efd1b5
--- /dev/null
+++ b/compat_go18.go
@@ -0,0 +1,50 @@
+package avatica
+
+import (
+ "database/sql/driver"
+ "fmt"
+)
+
+type namedValue struct {
+ Name string
+ Ordinal int
+ Value driver.Value
+}
+
+func driverValueToNamedValue(values []driver.Value) []namedValue {
+ list := make([]namedValue, len(values))
+
+ for i, v := range values {
+ list[i] = namedValue{
+ Ordinal: i + 1,
+ Value: v,
+ }
+ }
+
+ return list
+}
+
+func driverNamedValueToNamedValue(values []driver.NamedValue) ([]namedValue,error ) {
+ list := make([]namedValue, len(values))
+
+ for i, nv := range values {
+ list[i] = namedValue(nv)
+
+ if nv.Name != ""{
+ return list,fmt.Errorf("named paramters are not supported: %s given", nv.Name)
+ }
+ }
+
+ return list, nil
+}
+
+type isoLevel int32
+
+const (
+ isolationUseCurrent isoLevel = -1
+ isolationNone isoLevel = 0
+ isolationReadUncommitted isoLevel = 1
+ isolationReadComitted isoLevel = 2
+ isolationRepeatableRead isoLevel = 4
+ isolationSerializable isoLevel = 8
+)
http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/connection.go
----------------------------------------------------------------------
diff --git a/connection.go b/connection.go
index 1a3c9df..2149261 100644
--- a/connection.go
+++ b/connection.go
@@ -2,6 +2,7 @@ package avatica
import (
"database/sql/driver"
+
"github.com/Boostport/avatica/message"
"golang.org/x/net/context"
)
@@ -14,12 +15,15 @@ type conn struct {
// Prepare returns a prepared statement, bound to this connection.
func (c *conn) Prepare(query string) (driver.Stmt, error) {
+ return c.prepare(context.Background(), query)
+}
+func (c *conn) prepare(ctx context.Context, query string) (driver.Stmt, error) {
if c.connectionId == "" {
return nil, driver.ErrBadConn
}
- response, err := c.httpClient.post(context.Background(), &message.PrepareRequest{
+ response, err := c.httpClient.post(ctx, &message.PrepareRequest{
ConnectionId: c.connectionId,
Sql: query,
MaxRowsTotal: c.config.maxRowsTotal,
@@ -65,16 +69,24 @@ func (c *conn) Close() error {
// Begin starts and returns a new transaction.
func (c *conn) Begin() (driver.Tx, error) {
+ return c.begin(context.Background(), isolationUseCurrent)
+}
+
+func (c *conn) begin(ctx context.Context, isolationLevel isoLevel) (driver.Tx, error) {
if c.connectionId == "" {
return nil, driver.ErrBadConn
}
- _, err := c.httpClient.post(context.Background(), &message.ConnectionSyncRequest{
+ if isolationLevel == isolationUseCurrent {
+ isolationLevel = isoLevel(c.config.transactionIsolation)
+ }
+
+ _, err := c.httpClient.post(ctx, &message.ConnectionSyncRequest{
ConnectionId: c.connectionId,
ConnProps: &message.ConnectionProperties{
AutoCommit: false,
HasAutoCommit: true,
- TransactionIsolation: c.config.transactionIsolation,
+ TransactionIsolation: uint32(isolationLevel),
},
})
@@ -89,7 +101,11 @@ func (c *conn) Begin() (driver.Tx, error) {
// Exec prepares and executes a query and returns the result directly.
func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
+ list := driverValueToNamedValue(args)
+ return c.exec(context.Background(), query, list)
+}
+func (c *conn) exec(ctx context.Context, query string, args []namedValue) (driver.Result, error) {
if c.connectionId == "" {
return nil, driver.ErrBadConn
}
@@ -98,7 +114,7 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
return nil, driver.ErrSkip
}
- st, err := c.httpClient.post(context.Background(), &message.CreateStatementRequest{
+ st, err := c.httpClient.post(ctx, &message.CreateStatementRequest{
ConnectionId: c.connectionId,
})
@@ -106,7 +122,7 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
return nil, err
}
- res, err := c.httpClient.post(context.Background(), &message.PrepareAndExecuteRequest{
+ res, err := c.httpClient.post(ctx, &message.PrepareAndExecuteRequest{
ConnectionId: c.connectionId,
StatementId: st.(*message.CreateStatementResponse).StatementId,
Sql: query,
@@ -124,13 +140,16 @@ func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
return &result{
affectedRows: changed,
}, nil
-
}
// Query prepares and executes a query and returns the result directly.
// Query's optimizations are currently disabled due to CALCITE-1181.
func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
+ list := driverValueToNamedValue(args)
+ return c.query(context.Background(), query, list)
+}
+func (c *conn) query(ctx context.Context, query string, args []namedValue) (driver.Rows, error) {
if c.connectionId == "" {
return nil, driver.ErrBadConn
}
@@ -139,7 +158,7 @@ func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
return nil, driver.ErrSkip
}
- st, err := c.httpClient.post(context.Background(), &message.CreateStatementRequest{
+ st, err := c.httpClient.post(ctx, &message.CreateStatementRequest{
ConnectionId: c.connectionId,
})
@@ -147,7 +166,7 @@ func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
return nil, err
}
- res, err := c.httpClient.post(context.Background(), &message.PrepareAndExecuteRequest{
+ res, err := c.httpClient.post(ctx, &message.PrepareAndExecuteRequest{
ConnectionId: c.connectionId,
StatementId: st.(*message.CreateStatementResponse).StatementId,
Sql: query,
http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/connection_go18.go
----------------------------------------------------------------------
diff --git a/connection_go18.go b/connection_go18.go
new file mode 100644
index 0000000..a54a115
--- /dev/null
+++ b/connection_go18.go
@@ -0,0 +1,80 @@
+// +build go1.8
+package avatica
+
+import (
+ "database/sql/driver"
+
+ "errors"
+
+ "database/sql"
+ "fmt"
+
+ "golang.org/x/net/context"
+)
+
+func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
+
+ if opts.ReadOnly {
+ return nil, errors.New("Read-only transactions are not supported")
+ }
+
+ var isolation isoLevel
+
+ switch sql.IsolationLevel(opts.Isolation) {
+ case sql.LevelDefault:
+ isolation = isolationUseCurrent
+ case sql.LevelReadUncommitted:
+ isolation = isolationReadUncommitted
+ case sql.LevelReadCommitted:
+ isolation = isolationReadComitted
+ case sql.LevelWriteCommitted:
+ return nil, errors.New("LevelWriteCommitted isolation level is not supported")
+ case sql.LevelRepeatableRead:
+ isolation = isolationRepeatableRead
+ case sql.LevelSnapshot:
+ return nil, errors.New("LevelSnapshot isolation level is not supported")
+ case sql.LevelSerializable:
+ isolation = isolationSerializable
+ case sql.LevelLinearizable:
+ return nil, errors.New("LevelLinearizable isolation level is not supported")
+ default:
+ return nil, fmt.Errorf("Unsupported transaction isolation level: %d", opts.Isolation)
+ }
+
+ return c.begin(ctx, isolation)
+}
+
+func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
+ return c.prepare(ctx, query)
+}
+
+func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
+ list, err := driverNamedValueToNamedValue(args)
+
+ if err != nil {
+ return nil, fmt.Errorf("could not execute statement: %s", err)
+ }
+
+ return c.exec(ctx, query, list)
+}
+
+func (c *conn) Ping(ctx context.Context) error {
+
+ _, err := c.ExecContext(ctx, "SELECT 1;", []driver.NamedValue{})
+
+ if err != nil {
+ return fmt.Errorf("Error pinging database: %s", err)
+ }
+
+ return nil
+}
+
+func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
+ list, err := driverNamedValueToNamedValue(args)
+
+ if err != nil {
+ return nil, fmt.Errorf("could not execute query: %s", err)
+ }
+
+ return c.query(ctx, query, list)
+}
http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/statement.go
----------------------------------------------------------------------
diff --git a/statement.go b/statement.go
index 47d487f..1861a93 100644
--- a/statement.go
+++ b/statement.go
@@ -2,9 +2,10 @@ package avatica
import (
"database/sql/driver"
+ "time"
+
"github.com/Boostport/avatica/message"
"golang.org/x/net/context"
- "time"
)
type stmt struct {
@@ -45,12 +46,17 @@ func (s *stmt) NumInput() int {
// Exec executes a query that doesn't return rows, such
// as an INSERT or UPDATE.
func (s *stmt) Exec(args []driver.Value) (driver.Result, error) {
+ list := driverValueToNamedValue(args)
+ return s.exec(context.Background(), list)
+}
+
+func (s *stmt) exec(ctx context.Context, args []namedValue) (driver.Result, error) {
if s.conn.connectionId == "" {
return nil, driver.ErrBadConn
}
- res, err := s.conn.httpClient.post(context.Background(), &message.ExecuteRequest{
+ res, err := s.conn.httpClient.post(ctx, &message.ExecuteRequest{
StatementHandle: &s.handle,
ParameterValues: s.parametersToTypedValues(args),
FirstFrameMaxSize: uint64(s.conn.config.frameMaxSize), //TODO: Due to CALCITE-1353, if frameMaxSize == -1, it overflows to 18446744073709551615 due to the conversion to uint64, which is basically all rows.
@@ -72,12 +78,16 @@ func (s *stmt) Exec(args []driver.Value) (driver.Result, error) {
// Query executes a query that may return rows, such as a
// SELECT.
func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
+ list := driverValueToNamedValue(args)
+ return s.query(context.Background(), list)
+}
+func (s *stmt) query(ctx context.Context, args []namedValue) (driver.Rows, error) {
if s.conn.connectionId == "" {
return nil, driver.ErrBadConn
}
- res, err := s.conn.httpClient.post(context.Background(), &message.ExecuteRequest{
+ res, err := s.conn.httpClient.post(ctx, &message.ExecuteRequest{
StatementHandle: &s.handle,
ParameterValues: s.parametersToTypedValues(args),
FirstFrameMaxSize: uint64(s.conn.config.frameMaxSize), //TODO: Due to CALCITE-1353, if frameMaxSize == -1, it overflows to 18446744073709551615 due to the conversion to uint64, which is basically all rows.
@@ -94,18 +104,18 @@ func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
return newRows(s.conn, s.statementID, resultSet), nil
}
-func (s *stmt) parametersToTypedValues(vals []driver.Value) []*message.TypedValue {
+func (s *stmt) parametersToTypedValues(vals []namedValue) []*message.TypedValue {
result := []*message.TypedValue{}
for i, val := range vals {
typed := message.TypedValue{}
- if val == nil {
+ if val.Value == nil {
typed.Null = true
} else {
- switch v := val.(type) {
+ switch v := val.Value.(type) {
case int64:
typed.Type = message.Rep_LONG
typed.NumberValue = v
http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/261f94f7/statement_go18.go
----------------------------------------------------------------------
diff --git a/statement_go18.go b/statement_go18.go
new file mode 100644
index 0000000..e7ee65e
--- /dev/null
+++ b/statement_go18.go
@@ -0,0 +1,32 @@
+// +build go1.8
+package avatica
+
+import (
+ "database/sql/driver"
+
+ "fmt"
+
+ "golang.org/x/net/context"
+)
+
+func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
+
+ list, err := driverNamedValueToNamedValue(args)
+
+ if err != nil {
+ return nil, fmt.Errorf("Error executing statement: %s", err)
+ }
+
+ return s.exec(ctx, list)
+}
+
+func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
+
+ list, err := driverNamedValueToNamedValue(args)
+
+ if err != nil {
+ return nil, fmt.Errorf("Error executing query: %s", err)
+ }
+
+ return s.query(ctx, list)
+}