You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2017/08/11 01:50:16 UTC

[19/50] calcite-avatica-go git commit: Implement multiple result sets, row type information and support for all phoenix data types

Implement multiple result sets, row type information and support for all phoenix data types


Project: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/commit/ae26325d
Tree: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/tree/ae26325d
Diff: http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/diff/ae26325d

Branch: refs/heads/master
Commit: ae26325d3a2dd737a0fdf9d172b354122301f271
Parents: 261f94f
Author: Francis Chuang <fr...@boostport.com>
Authored: Wed Mar 8 15:37:59 2017 +1100
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Aug 10 18:47:10 2017 -0700

----------------------------------------------------------------------
 connection.go  |   7 +-
 driver_test.go |  27 ++++++-
 rows.go        | 215 ++++++++++++++++++++++++++++++++++++++--------------
 rows_go18.go   |  59 ++++++++++++++
 statement.go   |  18 +++--
 5 files changed, 255 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/ae26325d/connection.go
----------------------------------------------------------------------
diff --git a/connection.go b/connection.go
index 2149261..9e59701 100644
--- a/connection.go
+++ b/connection.go
@@ -134,7 +134,7 @@ func (c *conn) exec(ctx context.Context, query string, args []namedValue) (drive
 		return nil, err
 	}
 
-	// Currently there is only 1 ResultSet per response
+	// Currently there is only 1 ResultSet per response for exec
 	changed := int64(res.(*message.ExecuteResponse).Results[0].UpdateCount)
 
 	return &result{
@@ -178,8 +178,7 @@ func (c *conn) query(ctx context.Context, query string, args []namedValue) (driv
 		return nil, err
 	}
 
-	// Currently there is only 1 ResultSet per response
-	resultSet := res.(*message.ExecuteResponse).Results[0]
+	resultSets := res.(*message.ExecuteResponse).Results
 
-	return newRows(c, st.(*message.CreateStatementResponse).StatementId, resultSet), nil
+	return newRows(c, st.(*message.CreateStatementResponse).StatementId, resultSets), nil
 }

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/ae26325d/driver_test.go
----------------------------------------------------------------------
diff --git a/driver_test.go b/driver_test.go
index 017920b..f2035f7 100644
--- a/driver_test.go
+++ b/driver_test.go
@@ -5,13 +5,14 @@ import (
 	"crypto/sha256"
 	"database/sql"
 	"fmt"
-	"github.com/satori/go.uuid"
 	"io/ioutil"
 	"os"
 	"path/filepath"
 	"strings"
 	"testing"
 	"time"
+
+	"github.com/satori/go.uuid"
 )
 
 var (
@@ -187,10 +188,14 @@ func TestDataTypes(t *testing.T) {
 				uflt UNSIGNED_FLOAT,
 				dbl DOUBLE,
 				udbl UNSIGNED_DOUBLE,
+				dec DECIMAL,
 				bool BOOLEAN,
 				tm TIME,
 				dt DATE,
 				tmstmp TIMESTAMP,
+				utm UNSIGNED_TIME,
+				udt UNSIGNED_DATE,
+				utmstmp UNSIGNED_TIMESTAMP,
 				var VARCHAR,
 				ch CHAR(3),
 				bin BINARY(20),
@@ -210,10 +215,14 @@ func TestDataTypes(t *testing.T) {
 			ufltValue     float64   = 3.555
 			dblValue      float64   = -9.555
 			udblValue     float64   = 9.555
+			decValue      string    = "1.333"
 			booleanValue  bool      = true
 			tmValue       time.Time = time.Date(0, 1, 1, 21, 21, 21, 222000000, time.UTC)
 			dtValue       time.Time = time.Date(2100, 2, 1, 0, 0, 0, 0, time.UTC)
 			tmstmpValue   time.Time = time.Date(2100, 2, 1, 21, 21, 21, 222000000, time.UTC)
+			utmValue      time.Time = time.Date(0, 1, 1, 21, 21, 21, 222000000, time.UTC)
+			udtValue      time.Time = time.Date(2100, 2, 1, 0, 0, 0, 0, time.UTC)
+			utmstmpValue  time.Time = time.Date(2100, 2, 1, 21, 21, 21, 222000000, time.UTC)
 			varcharValue  string    = "test string"
 			chValue       string    = "a"
 			binValue      []byte    = make([]byte, 20, 20)
@@ -222,7 +231,7 @@ func TestDataTypes(t *testing.T) {
 
 		copy(binValue[:], "test")
 
-		dbt.mustExec(`UPSERT INTO `+dbt.tableName+` VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+		dbt.mustExec(`UPSERT INTO `+dbt.tableName+` VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
 			integerValue,
 			uintegerValue,
 			bintValue,
@@ -235,10 +244,14 @@ func TestDataTypes(t *testing.T) {
 			ufltValue,
 			dblValue,
 			udblValue,
+			decValue,
 			booleanValue,
 			tmValue,
 			dtValue,
 			tmstmpValue,
+			utmValue,
+			udtValue,
+			utmstmpValue,
 			varcharValue,
 			chValue,
 			binValue,
@@ -261,10 +274,14 @@ func TestDataTypes(t *testing.T) {
 			uflt     float64
 			dbl      float64
 			udbl     float64
+			dec      string
 			boolean  bool
 			tm       time.Time
 			dt       time.Time
 			tmstmp   time.Time
+			utm      time.Time
+			udt      time.Time
+			utmstmp  time.Time
 			varchar  string
 			ch       string
 			bin      []byte
@@ -273,7 +290,7 @@ func TestDataTypes(t *testing.T) {
 
 		for rows.Next() {
 
-			err := rows.Scan(&integer, &uinteger, &bint, &ulong, &tint, &utint, &sint, &usint, &flt, &uflt, &dbl, &udbl, &boolean, &tm, &dt, &tmstmp, &varchar, &ch, &bin, &varbin)
+			err := rows.Scan(&integer, &uinteger, &bint, &ulong, &tint, &utint, &sint, &usint, &flt, &uflt, &dbl, &udbl, &dec, &boolean, &tm, &dt, &tmstmp, &utm, &udt, &utmstmp, &varchar, &ch, &bin, &varbin)
 
 			if err != nil {
 				dbt.Fatal(err)
@@ -296,10 +313,14 @@ func TestDataTypes(t *testing.T) {
 			{uflt, ufltValue},
 			{dbl, dblValue},
 			{udbl, udblValue},
+			{dec, decValue},
 			{boolean, booleanValue},
 			{tm, tmValue},
 			{dt, dtValue},
 			{tmstmp, tmstmpValue},
+			{utm, utmValue},
+			{udt, udtValue},
+			{utmstmp, utmstmpValue},
 			{varchar, varcharValue},
 			{ch, chValue},
 			{bin, binValue},

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/ae26325d/rows.go
----------------------------------------------------------------------
diff --git a/rows.go b/rows.go
index 82b67ff..4d69ff7 100644
--- a/rows.go
+++ b/rows.go
@@ -2,21 +2,47 @@ package avatica
 
 import (
 	"database/sql/driver"
-	"github.com/Boostport/avatica/message"
-	"golang.org/x/net/context"
 	"io"
 	"time"
+
+	"reflect"
+
+	"math"
+
+	"fmt"
+
+	"github.com/Boostport/avatica/message"
+	"golang.org/x/net/context"
 )
 
+type precisionScale struct {
+	precision int64
+	scale     int64
+}
+
+type column struct {
+	name           string
+	typeName       string
+	rep            message.Rep
+	length         int64
+	nullable       bool
+	precisionScale *precisionScale
+	scanType       reflect.Type
+}
+
+type resultSet struct {
+	columns    []*column
+	done       bool
+	offset     uint64
+	data       [][]*message.TypedValue
+	currentRow int
+}
+
 type rows struct {
-	conn        *conn
-	statementID uint32
-	columnNames []string
-	columnTypes []message.Rep
-	done        bool
-	offset      uint64
-	data        [][]*message.TypedValue
-	currentRow  int
+	conn             *conn
+	statementID      uint32
+	resultSets       []*resultSet
+	currentResultSet int
 }
 
 // Columns returns the names of the columns. The number of
@@ -24,7 +50,14 @@ type rows struct {
 // slice.  If a particular column name isn't known, an empty
 // string should be returned for that entry.
 func (r *rows) Columns() []string {
-	return r.columnNames
+
+	cols := []string{}
+
+	for _, column := range r.resultSets[r.currentResultSet].columns {
+		cols = append(cols, column.name)
+	}
+
+	return cols
 }
 
 // Close closes the rows iterator.
@@ -45,9 +78,11 @@ func (r *rows) Close() error {
 // Next should return io.EOF when there are no more rows.
 func (r *rows) Next(dest []driver.Value) error {
 
-	if r.currentRow >= len(r.data) {
+	resultSet := r.resultSets[r.currentResultSet]
+
+	if resultSet.currentRow >= len(resultSet.data) {
 
-		if r.done {
+		if resultSet.done {
 			// Finished iterating through all results
 			return io.EOF
 		}
@@ -56,7 +91,7 @@ func (r *rows) Next(dest []driver.Value) error {
 		res, err := r.conn.httpClient.post(context.Background(), &message.FetchRequest{
 			ConnectionId: r.conn.connectionId,
 			StatementId:  r.statementID,
-			Offset:       r.offset,
+			Offset:       resultSet.offset,
 			FrameMaxSize: r.conn.config.frameMaxSize,
 		})
 
@@ -84,69 +119,135 @@ func (r *rows) Next(dest []driver.Value) error {
 			data = append(data, rowData)
 		}
 
-		r.done = frame.Done
-		r.data = data
-		r.currentRow = 0
+		resultSet.done = frame.Done
+		resultSet.data = data
+		resultSet.currentRow = 0
 
 	}
 
-	for i, val := range r.data[r.currentRow] {
-		dest[i] = typedValueToNative(r.columnTypes[i], val, r.conn.config)
+	for i, val := range resultSet.data[resultSet.currentRow] {
+		dest[i] = typedValueToNative(resultSet.columns[i].rep, val, r.conn.config)
 	}
 
-	r.currentRow++
+	resultSet.currentRow++
 
 	return nil
 }
 
 // newRows create a new set of rows from a result set.
-func newRows(conn *conn, statementID uint32, resultSet *message.ResultSetResponse) *rows {
-
-	columnNames := []string{}
-	columnTypes := []message.Rep{}
-
-	for _, col := range resultSet.Signature.Columns {
-		columnNames = append(columnNames, col.ColumnName)
-
-		// Special case for floats, date, time and timestamp
-		switch col.Type.Name {
-		case "FLOAT":
-			columnTypes = append(columnTypes, message.Rep_FLOAT)
-		case "UNSIGNED_FLOAT":
-			columnTypes = append(columnTypes, message.Rep_FLOAT)
-		case "TIME":
-			columnTypes = append(columnTypes, message.Rep_JAVA_SQL_TIME)
-		case "DATE":
-			columnTypes = append(columnTypes, message.Rep_JAVA_SQL_DATE)
-		case "TIMESTAMP":
-			columnTypes = append(columnTypes, message.Rep_JAVA_SQL_TIMESTAMP)
-		default:
-			columnTypes = append(columnTypes, col.Type.Rep)
+func newRows(conn *conn, statementID uint32, resultSets []*message.ResultSetResponse) *rows {
+
+	rsets := []*resultSet{}
+
+	for _, result := range resultSets {
+		columns := []*column{}
+
+		for _, col := range result.Signature.Columns {
+
+			column := &column{
+				name:     col.ColumnName,
+				typeName: col.Type.Name,
+				nullable: col.Nullable != 0,
+			}
+
+			// Handle precision and length
+			switch col.Type.Name {
+			case "DECIMAL":
+
+				precision := int64(col.Precision)
+
+				if precision == 0 {
+					precision = math.MaxInt64
+				}
+
+				scale := int64(col.Scale)
+
+				if scale == 0 {
+					scale = math.MaxInt64
+				}
+
+				column.precisionScale = &precisionScale{
+					precision: precision,
+					scale:     scale,
+				}
+			case "VARCHAR", "CHAR", "BINARY":
+				column.length = int64(col.Precision)
+			case "VARBINARY":
+				column.length = math.MaxInt64
+			}
+
+			// Handle scan types
+			switch col.Type.Name {
+			case "INTEGER", "UNSIGNED_INT", "BIGINT", "UNSIGNED_LONG", "TINYINT", "UNSIGNED_TINYINT", "SMALLINT", "UNSIGNED_SMALLINT":
+				column.scanType = reflect.TypeOf(int64(0))
+
+			case "FLOAT", "UNSIGNED_FLOAT", "DOUBLE", "UNSIGNED_DOUBLE":
+				column.scanType = reflect.TypeOf(float64(0))
+
+			case "DECIMAL", "VARCHAR", "CHAR":
+				column.scanType = reflect.TypeOf("")
+
+			case "BOOLEAN":
+				column.scanType = reflect.TypeOf(bool(false))
+
+			case "TIME", "DATE", "TIMESTAMP", "UNSIGNED_TIME", "UNSIGNED_DATE", "UNSIGNED_TIMESTAMP":
+				column.scanType = reflect.TypeOf(time.Time{})
+
+			case "BINARY", "VARBINARY":
+				column.scanType = reflect.TypeOf([]byte{})
+
+			default:
+				panic(fmt.Sprintf("scantype for %s is not implemented", col.Type.Name))
+			}
+
+			// Handle rep type special cases for decimals, floats, date, time and timestamp
+			switch col.Type.Name {
+			case "DECIMAL":
+				column.rep = message.Rep_BIG_DECIMAL
+			case "FLOAT":
+				column.rep = message.Rep_FLOAT
+			case "UNSIGNED_FLOAT":
+				column.rep = message.Rep_FLOAT
+			case "TIME", "UNSIGNED_TIME":
+				column.rep = message.Rep_JAVA_SQL_TIME
+			case "DATE", "UNSIGNED_DATE":
+				column.rep = message.Rep_JAVA_SQL_DATE
+			case "TIMESTAMP", "UNSIGNED_TIMESTAMP":
+				column.rep = message.Rep_JAVA_SQL_TIMESTAMP
+			default:
+				column.rep = col.Type.Rep
+			}
+
+			columns = append(columns, column)
 		}
-	}
 
-	frame := resultSet.FirstFrame
+		frame := result.FirstFrame
 
-	data := [][]*message.TypedValue{}
+		data := [][]*message.TypedValue{}
 
-	for _, row := range frame.Rows {
-		rowData := []*message.TypedValue{}
+		for _, row := range frame.Rows {
+			rowData := []*message.TypedValue{}
 
-		for _, col := range row.Value {
-			rowData = append(rowData, col.ScalarValue)
+			for _, col := range row.Value {
+				rowData = append(rowData, col.ScalarValue)
+			}
+
+			data = append(data, rowData)
 		}
 
-		data = append(data, rowData)
+		rsets = append(rsets, &resultSet{
+			columns: columns,
+			done:    frame.Done,
+			offset:  frame.Offset,
+			data:    data,
+		})
 	}
 
 	return &rows{
-		conn:        conn,
-		statementID: statementID,
-		columnNames: columnNames,
-		columnTypes: columnTypes,
-		done:        frame.Done,
-		offset:      frame.Offset,
-		data:        data,
+		conn:             conn,
+		statementID:      statementID,
+		resultSets:       rsets,
+		currentResultSet: 0,
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/ae26325d/rows_go18.go
----------------------------------------------------------------------
diff --git a/rows_go18.go b/rows_go18.go
new file mode 100644
index 0000000..9a783c2
--- /dev/null
+++ b/rows_go18.go
@@ -0,0 +1,59 @@
+// +build go1.8
+package avatica
+
+import (
+	"io"
+	"reflect"
+)
+
+func (r *rows) HasNextResultSet() bool {
+	lastResultSetID := len(r.resultSets) - 1
+	return lastResultSetID > r.currentResultSet
+}
+
+func (r *rows) NextResultSet() error {
+
+	lastResultSetID := len(r.resultSets) - 1
+
+	if r.currentResultSet+1 > lastResultSetID {
+		return io.EOF
+	}
+
+	r.currentResultSet++
+
+	return nil
+}
+
+func (r *rows) ColumnTypeDatabaseTypeName(index int) string {
+
+	return r.resultSets[r.currentResultSet].columns[index].typeName
+}
+
+func (r *rows) ColumnTypeLength(index int) (length int64, ok bool) {
+	l := r.resultSets[r.currentResultSet].columns[index].length
+
+	if l == 0 {
+		return 0, false
+	}
+
+	return l, true
+}
+
+func (r *rows) ColumnTypeNullable(index int) (nullable, ok bool) {
+	return r.resultSets[r.currentResultSet].columns[index].nullable, true
+}
+
+func (r *rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
+
+	ps := r.resultSets[r.currentResultSet].columns[index].precisionScale
+
+	if ps != nil {
+		return ps.precision, ps.scale, true
+	}
+
+	return 0, 0, false
+}
+
+func (r *rows) ColumnTypeScanType(index int) reflect.Type {
+	return r.resultSets[r.currentResultSet].columns[index].scanType
+}

http://git-wip-us.apache.org/repos/asf/calcite-avatica-go/blob/ae26325d/statement.go
----------------------------------------------------------------------
diff --git a/statement.go b/statement.go
index 1861a93..3e6b64a 100644
--- a/statement.go
+++ b/statement.go
@@ -98,8 +98,7 @@ func (s *stmt) query(ctx context.Context, args []namedValue) (driver.Rows, error
 		return nil, err
 	}
 
-	// Currently there is only 1 ResultSet per response
-	resultSet := res.(*message.ExecuteResponse).Results[0]
+	resultSet := res.(*message.ExecuteResponse).Results
 
 	return newRows(s.conn, s.statementID, resultSet), nil
 }
@@ -110,7 +109,6 @@ func (s *stmt) parametersToTypedValues(vals []namedValue) []*message.TypedValue
 
 	for i, val := range vals {
 		typed := message.TypedValue{}
-
 		if val.Value == nil {
 			typed.Null = true
 		} else {
@@ -129,13 +127,19 @@ func (s *stmt) parametersToTypedValues(vals []namedValue) []*message.TypedValue
 				typed.Type = message.Rep_BYTE_STRING
 				typed.BytesValue = v
 			case string:
-				typed.Type = message.Rep_STRING
+
+				if s.parameters[i].TypeName == "DECIMAL" {
+					typed.Type = message.Rep_BIG_DECIMAL
+				} else {
+					typed.Type = message.Rep_STRING
+				}
 				typed.StringValue = v
+
 			case time.Time:
 				avaticaParameter := s.parameters[i]
 
 				switch avaticaParameter.TypeName {
-				case "TIME":
+				case "TIME", "UNSIGNED_TIME":
 					typed.Type = message.Rep_JAVA_SQL_TIME
 
 					// Because a location can have multiple time zones due to daylight savings,
@@ -146,7 +150,7 @@ func (s *stmt) parametersToTypedValues(vals []namedValue) []*message.TypedValue
 					base := time.Date(v.Year(), v.Month(), v.Day(), 0, 0, 0, 0, time.FixedZone(zone, offset))
 					typed.NumberValue = int64(v.Sub(base).Nanoseconds() / int64(time.Millisecond))
 
-				case "DATE":
+				case "DATE", "UNSIGNED_DATE":
 					typed.Type = message.Rep_JAVA_SQL_DATE
 
 					// Because a location can have multiple time zones due to daylight savings,
@@ -157,7 +161,7 @@ func (s *stmt) parametersToTypedValues(vals []namedValue) []*message.TypedValue
 					base := time.Date(1970, 1, 1, 0, 0, 0, 0, time.FixedZone(zone, offset))
 					typed.NumberValue = int64(v.Sub(base) / (24 * time.Hour))
 
-				case "TIMESTAMP":
+				case "TIMESTAMP", "UNSIGNED_TIMESTAMP":
 					typed.Type = message.Rep_JAVA_SQL_TIMESTAMP
 
 					// Because a location can have multiple time zones due to daylight savings,