You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/12/20 06:07:57 UTC
[iotdb-client-go] branch main updated: iotdb-client-go (#3)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/main by this push:
new ca8e97c iotdb-client-go (#3)
ca8e97c is described below
commit ca8e97c62cce662a421791b8cf0d5066f0ae468e
Author: Mark Liu <ma...@gmail.com>
AuthorDate: Sun Dec 20 14:07:50 2020 +0800
iotdb-client-go (#3)
iotdb-go-client second version
---
.gitignore | 2 +
LICENSE-binary | 1 -
Makefile | 25 ++
README.md | 29 +-
README_ZH.md | 30 +-
client/{rowrecord.go => errors.go} | 33 +-
client/field.go | 226 +++--------
client/field_test.go | 460 ++++++++++++++++++++++
client/get_session.go | 71 ----
client/protocol.go | 183 +++++----
client/rowrecord.go | 16 +-
client/rowrecord_test.go | 96 +++++
client/rpc_dataset.go | 252 ------------
client/rpcdataset.go | 561 ++++++++++++++++++++++++++
client/rpcdataset_test.go | 618 +++++++++++++++++++++++++++++
client/session.go | 785 +++++++++++++++----------------------
client/session_dataset.go | 156 --------
client/sessiondataset.go | 115 ++++++
client/tablet.go | 256 ++++++------
client/tablet_test.go | 308 +++++++++++++++
client/utils.go | 106 +++++
client/utils_test.go | 308 +++++++++++++++
example/session_example.go | 474 ++++++++++++++--------
go.mod | 6 +-
go.sum | 1 +
test/session_test.go | 579 ---------------------------
26 files changed, 3608 insertions(+), 2089 deletions(-)
diff --git a/.gitignore b/.gitignore
index 0f99227..ed34fc7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,8 @@ output/
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
+# Dependency directories (remove the comment below to include it)
+# vendor/
**/*.pid
diff --git a/LICENSE-binary b/LICENSE-binary
index 7961354..4c5ac5d 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -216,4 +216,3 @@ following license. See licenses/ for text of these licenses.
Apache Software Foundation License 2.0
--------------------------------------
github.com/apache/thrift@v0.13.0
-
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..12a1d7c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+all: generate
+
+generate:
+ @curl -ss -o rpc.thrift https://raw.githubusercontent.com/apache/iotdb/master/thrift/src/main/thrift/rpc.thrift
+ @thrift -out . -gen go rpc.thrift
+ @rm -rf rpc/t_s_i_service-remote rpc.thrift
+
+.PHONY: generate all
diff --git a/README.md b/README.md
index 4de2a98..1a796a0 100644
--- a/README.md
+++ b/README.md
@@ -37,9 +37,32 @@ Apache IoTDB Github: https://github.com/apache/iotdb
```
Create project eg:session
-*Enter project directory eg:cd session
-*Download dependency : go get github.com/apache/iotdb-client-go
+With go mod
-*eg:https://github.com/apache/iotdb-client-go/example/session_example.go
+```sh
+export GO111MODULE=on
+export GOPROXY=https://goproxy.io
+
+mkdir session_example && cd session_example
+
+curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
+
+go mod init session_example
+go run session_example.go
+```
+
+Without go mod
+
+```sh
+# get thrift 0.13.0
+go get github.com/apache/thrift
+cd $GOPATH/src/github.com/apache/thrift
+git checkout 0.13.0
+
+mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
+cd $GOPATH/src/iotdb-client-go-example/session_example
+
+curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
+go run session_example.go
```
diff --git a/README_ZH.md b/README_ZH.md
index 41bd8b9..1df9a16 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -32,12 +32,34 @@ Apache IoTDB Github: https://github.com/apache/iotdb
# 如何编译
# 如何使用 (快速上手)
+
+使用go mod
+
+```sh
+export GO111MODULE=on
+export GOPROXY=https://goproxy.io
+
+mkdir session_example && cd session_example
+
+curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
+
+go mod init session_example
+go run session_example.go
```
-*创建项目 例:session
-*进入项目的文件夹 例:cd session
+不使用go mod,采用GOPATH
+
+```sh
+# get thrift 0.13.0
+go get github.com/apache/thrift
+cd $GOPATH/src/github.com/apache/thrift
+git checkout 0.13.0
-*下载依赖 : go get github.com/apache/iotdb-client-go
+mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
+cd $GOPATH/src/iotdb-client-go-example/session_example
-*示例 :https://github.com/apache/iotdb-client-go/example/session_example.go
+ fmt.Print("Time\t\t\t\t")
+ fmt.Print("Time\t\t\t\t")
+curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
+go run session_example.go
```
diff --git a/client/rowrecord.go b/client/errors.go
similarity index 60%
copy from client/rowrecord.go
copy to client/errors.go
index a42b1d2..17c6a60 100644
--- a/client/rowrecord.go
+++ b/client/errors.go
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,11 +19,30 @@
package client
-type RowRecord struct {
- Timestamp int64
- Fields []Field
+import (
+ "bytes"
+
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+type BatchError struct {
+ statuses []*rpc.TSStatus
+}
+
+func (e *BatchError) Error() string {
+ buff := bytes.Buffer{}
+ for _, status := range e.statuses {
+ buff.WriteString(*status.Message + ";")
+ }
+ return buff.String()
+}
+
+func (e *BatchError) GetStatuses() []*rpc.TSStatus {
+ return e.statuses
}
-func (r *RowRecord) AddField(field Field) {
- r.Fields[len(r.Fields)] = field
+func NewBatchError(statuses []*rpc.TSStatus) *BatchError {
+ return &BatchError{
+ statuses: statuses,
+ }
}
diff --git a/client/field.go b/client/field.go
index 38dc406..3e79445 100644
--- a/client/field.go
+++ b/client/field.go
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,198 +19,76 @@
package client
-import (
- "bytes"
- "encoding/binary"
- "errors"
-)
-
type Field struct {
- DataType string
- boolV bool
- intV int32
- longV int64
- floatV float32
- doubleV float64
- binaryV []byte
-}
-
-func Copy(field Field) (Field, error) {
- outField := NewField(field.DataType)
- if outField.DataType != "" {
- dateType := outField.DataType
- switch dateType {
- case "BOOLEAN":
- outField.SetBoolV(field.GetBoolV())
- break
- case "INT32":
- outField.SetIntV(field.GetIntV())
- break
- case "INT64":
- outField.SetLongV(field.GetLongV())
- break
- case "FLOAT":
- outField.SetFloatV(field.GetFloatV())
- break
- case "DOUBLE":
- outField.SetDoubleV(field.GetDoubleV())
- break
- case "TEXT":
- outField.SetBinaryV(field.GetBinaryV())
- break
- default:
- return outField, errors.New("unsupported data type " + dateType)
- }
- }
- return outField, nil
-}
-
-func NewField(DataType string) Field {
- return Field{DataType: DataType}
-}
-
-func (f *Field) GetBoolV() bool {
- return f.boolV
-}
-
-func (f *Field) SetBoolV(boolVal bool) {
- f.boolV = boolVal
-}
-
-func (f *Field) GetBinaryV() []byte {
- return f.binaryV
-}
-
-func (f *Field) SetBinaryV(binaryVal []byte) {
- f.binaryV = binaryVal
-}
-
-func (f *Field) GetFloatV() float32 {
- return f.floatV
-}
-
-func (f *Field) SetFloatV(floatVal float32) {
- f.floatV = floatVal
-}
-
-func (f *Field) GetDoubleV() float64 {
- return f.doubleV
+ dataType TSDataType
+ name string
+ value interface{}
}
-func (f *Field) SetDoubleV(doubleVal float64) {
- f.doubleV = doubleVal
+func (f *Field) IsNull() bool {
+ return f.value == nil
}
-func (f *Field) GetIntV() int32 {
- return f.intV
+func (f *Field) GetDataType() TSDataType {
+ return f.dataType
}
-func (f *Field) SetIntV(intVal int32) {
- f.intV = intVal
+func (f *Field) GetValue() interface{} {
+ return f.value
}
-func (f *Field) GetLongV() int64 {
- return f.longV
+func (f *Field) GetInt32() int32 {
+ if f.value == nil {
+ return 0
+ }
+ return f.value.(int32)
}
-func (f *Field) SetLongV(longVal int64) {
- f.longV = longVal
+func (f *Field) GetInt64() int64 {
+ if f.value == nil {
+ return 0
+ }
+ return f.value.(int64)
}
-func (f *Field) ToString() (string, error) {
- return f.GetStringValue()
+func (f *Field) GetFloat32() float32 {
+ if f.value == nil {
+ return 0
+ }
+ return f.value.(float32)
}
-func (f *Field) GetStringValue() (string, error) {
- if f.DataType == "" {
- return "", nil
- }
- dateType := f.DataType
- buf := bytes.NewBuffer([]byte{})
- switch dateType {
- case "BOOLEAN":
- binary.Write(buf, binary.BigEndian, &f.boolV)
- break
- case "INT32":
- binary.Write(buf, binary.BigEndian, &f.intV)
- break
- case "INT64":
- binary.Write(buf, binary.BigEndian, &f.longV)
- break
- case "FLOAT":
- binary.Write(buf, binary.BigEndian, &f.floatV)
- break
- case "DOUBLE":
- binary.Write(buf, binary.BigEndian, &f.doubleV)
- break
- case "TEXT":
- buf.Write(f.binaryV)
- break
- default:
- return "", errors.New("unsupported data type " + dateType)
+func (f *Field) GetFloat64() float64 {
+ if f.value == nil {
+ return 0
}
- return buf.String(), nil
+ return f.value.(float64)
}
-func (f *Field) GetField(value interface{}, dateType string) (*Field, error) {
- if value == nil {
- return nil, nil
- }
- field := NewField(dateType)
- switch dateType {
- case "BOOLEAN":
- field.SetBoolV(value.(bool))
- break
- case "INT32":
- field.SetIntV(int32(value.(int)))
- break
- case "INT64":
- field.SetLongV(int64(value.(int)))
- break
- case "FLOAT":
- field.SetFloatV(float32(value.(float64)))
- break
- case "DOUBLE":
- field.SetDoubleV(value.(float64))
- break
- case "TEXT":
- field.SetBinaryV([]byte(value.(string)))
- break
- default:
- return &field, errors.New("unsupported data type " + dateType)
- }
- return &field, nil
+func (f *Field) GetName() string {
+ return f.name
}
-func (f *Field) GetObjectValue(dateType string) (interface{}, error) {
- if f.DataType == "" {
- return nil, nil
+func (f *Field) GetText() string {
+ if f.value == nil {
+ return ""
}
- switch dateType {
- case "BOOLEAN":
- return f.GetBoolV(), nil
- break
- case "INT32":
- return f.GetIntV(), nil
- break
- case "INT64":
- return f.GetLongV(), nil
- break
- case "FLOAT":
- return f.GetFloatV(), nil
- break
- case "DOUBLE":
- return f.GetDoubleV(), nil
- break
- case "TEXT":
- return f.GetBinaryV(), nil
- break
- default:
- return nil, errors.New("unsupported data type " + dateType)
+ switch f.value.(type) {
+ case bool:
+ if f.value.(bool) {
+ return "true"
+ }
+ return "false"
+ case int32:
+ return int32ToString(f.value.(int32))
+ case int64:
+ return int64ToString(f.value.(int64))
+ case float32:
+ return float32ToString(f.value.(float32))
+ case float64:
+ return float64ToString(f.value.(float64))
+ case string:
+ return f.value.(string)
}
- return nil, nil
-}
-
-func (f *Field) IsNull() bool {
- return f.DataType == ""
+ return ""
}
diff --git a/client/field_test.go b/client/field_test.go
new file mode 100644
index 0000000..859d8d5
--- /dev/null
+++ b/client/field_test.go
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestField_IsNull(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want bool
+ }{
+ {
+ name: "IsNull-1",
+ fields: fields{
+ dataType: 0,
+ name: "",
+ value: nil,
+ },
+ want: true,
+ }, {
+ name: "IsNull-2",
+ fields: fields{
+ dataType: 0,
+ name: "",
+ value: 1,
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.IsNull(); got != tt.want {
+ t.Errorf("Field.IsNull() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetDataType(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want TSDataType
+ }{
+ {
+ name: "GetDataType-BOOLEAN",
+ fields: fields{
+ dataType: BOOLEAN,
+ name: "",
+ value: nil,
+ },
+ want: BOOLEAN,
+ }, {
+ name: "GetDataType-INT32",
+ fields: fields{
+ dataType: INT32,
+ name: "",
+ value: nil,
+ },
+ want: INT32,
+ }, {
+ name: "GetDataType-INT64",
+ fields: fields{
+ dataType: INT64,
+ name: "",
+ value: nil,
+ },
+ want: INT64,
+ }, {
+ name: "GetDataType-FLOAT",
+ fields: fields{
+ dataType: FLOAT,
+ name: "",
+ value: nil,
+ },
+ want: FLOAT,
+ }, {
+ name: "GetDataType-DOUBLE",
+ fields: fields{
+ dataType: DOUBLE,
+ name: "",
+ value: nil,
+ },
+ want: DOUBLE,
+ }, {
+ name: "GetDataType-TEXT",
+ fields: fields{
+ dataType: TEXT,
+ name: "",
+ value: nil,
+ },
+ want: TEXT,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetDataType(); got != tt.want {
+ t.Errorf("Field.GetDataType() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetValue(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want interface{}
+ }{
+ {
+ name: "GetValue-BOOLEAN",
+ fields: fields{
+ dataType: BOOLEAN,
+ name: "",
+ value: true,
+ },
+ want: true,
+ }, {
+ name: "GetValue-INT32",
+ fields: fields{
+ dataType: INT32,
+ name: "",
+ value: int32(65535),
+ },
+ want: int32(65535),
+ }, {
+ name: "GetValue-INT64",
+ fields: fields{
+ dataType: INT64,
+ name: "",
+ value: int64(65535),
+ },
+ want: int64(65535),
+ }, {
+ name: "GetValue-FLOAT",
+ fields: fields{
+ dataType: FLOAT,
+ name: "",
+ value: float32(32.768),
+ },
+ want: float32(32.768),
+ }, {
+ name: "GetValue-DOUBLE",
+ fields: fields{
+ dataType: DOUBLE,
+ name: "",
+ value: float64(32.768),
+ },
+ want: float64(32.768),
+ }, {
+ name: "GetValue-TEXT",
+ fields: fields{
+ dataType: TEXT,
+ name: "",
+ value: "TEXT",
+ },
+ want: "TEXT",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetValue(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Field.GetValue() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetInt32(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want int32
+ }{
+ {
+ name: "GetInt32-01",
+ fields: fields{
+ dataType: INT32,
+ name: "",
+ value: int32(65535),
+ },
+ want: 65535,
+ }, {
+ name: "GetInt32-02",
+ fields: fields{
+ dataType: INT32,
+ name: "restart_count",
+ value: nil,
+ },
+ want: 0,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetInt32(); got != tt.want {
+ t.Errorf("Field.GetInt32() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetInt64(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want int64
+ }{
+ {
+ name: "GetInt64-01",
+ fields: fields{
+ dataType: INT64,
+ name: "",
+ value: int64(65535),
+ },
+ want: 65535,
+ }, {
+ name: "GetInt64-02",
+ fields: fields{
+ dataType: INT64,
+ name: "tickCount",
+ value: nil,
+ },
+ want: 0,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetInt64(); got != tt.want {
+ t.Errorf("Field.GetInt64() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetFloat32(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want float32
+ }{
+ {
+ name: "GetFloat32",
+ fields: fields{
+ dataType: FLOAT,
+ name: "",
+ value: float32(32.768),
+ },
+ want: 32.768,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetFloat32(); got != tt.want {
+ t.Errorf("Field.GetFloat32() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetFloat64(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want float64
+ }{
+ {
+ name: "GetFloat64",
+ fields: fields{
+ dataType: DOUBLE,
+ name: "",
+ value: float64(32.768),
+ },
+ want: 32.768,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetFloat64(); got != tt.want {
+ t.Errorf("Field.GetFloat64() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_GetText(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "GetText-01",
+ fields: fields{
+ dataType: TEXT,
+ name: "",
+ value: "32.768",
+ },
+ want: "32.768",
+ }, {
+ name: "GetText-02",
+ fields: fields{
+ dataType: TEXT,
+ name: "",
+ value: nil,
+ },
+ want: "",
+ }, {
+ name: "GetText-03",
+ fields: fields{
+ dataType: INT32,
+ name: "",
+ value: int32(1),
+ },
+ want: "1",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetText(); got != tt.want {
+ t.Errorf("Field.GetText() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestField_getName(t *testing.T) {
+ type fields struct {
+ dataType TSDataType
+ name string
+ value interface{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "GetName",
+ fields: fields{
+ dataType: TEXT,
+ name: "temperature",
+ value: float32(32),
+ },
+ want: "temperature",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &Field{
+ dataType: tt.fields.dataType,
+ name: tt.fields.name,
+ value: tt.fields.value,
+ }
+ if got := f.GetName(); got != tt.want {
+ t.Errorf("Field.GetName() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/client/get_session.go b/client/get_session.go
deleted file mode 100644
index fbbbdb7..0000000
--- a/client/get_session.go
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package client
-
-import (
- "github.com/apache/iotdb-client-go/rpc"
- "github.com/apache/thrift/lib/go/thrift"
-)
-
-const (
- DefaultUser = "root"
- DefaultPasswd = "root"
- DefaultZoneId = "Asia/Shanghai"
- DefaultFetchSize int32 = 10000
-)
-
-/*
- *session config
- *Host: server ip
- *Port: server port
- *User user name
- *Passwd user passwd
- *FetchSize int32
- *ZoneId string
- */
-type Config struct {
- Host string
- Port string
- User string
- Passwd string
- FetchSize int32
- ZoneId string
-}
-
-type Session struct {
- config *Config
- client *rpc.TSIServiceClient
- sessionId int64
- trans thrift.TTransport
- requestStatementId int64
-}
-
-func NewConfig() *Config {
- return &Config{
- User: DefaultUser,
- Passwd: DefaultPasswd,
- FetchSize: DefaultFetchSize,
- ZoneId: DefaultZoneId,
- }
-}
-
-func NewSession(config *Config) *Session {
- return &Session{config: config}
-}
diff --git a/client/protocol.go b/client/protocol.go
index dee1ddc..fcae415 100644
--- a/client/protocol.go
+++ b/client/protocol.go
@@ -1,13 +1,13 @@
-/**
- * Licensed to the Apache Software Foundation =ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 =the
+ * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,101 +19,110 @@
package client
+type TSDataType int16
+
+type TSEncoding int16
+
+type TSCompressionType int16
+
const (
- BOOLEAN int32 = 0
- INT32 int32 = 1
- INT64 int32 = 2
- FLOAT int32 = 3
- DOUBLE int32 = 4
- TEXT int32 = 5
- INT int32 = 6
+ UNKNOW TSDataType = -1
+ BOOLEAN TSDataType = 0
+ INT32 TSDataType = 1
+ INT64 TSDataType = 2
+ FLOAT TSDataType = 3
+ DOUBLE TSDataType = 4
+ TEXT TSDataType = 5
)
-const (
- PLAIN int32 = 0
- PLAIN_DICTIONARY int32 = 1
- RLE int32 = 2
- DIFF int32 = 3
- TS_2DIFF int32 = 4
- BITMAP int32 = 5
- GORILLA_V1 int32 = 6
- REGULAR int32 = 7
- GORILLA int32 = 8
+const (
+ PLAIN TSEncoding = 0
+ PLAIN_DICTIONARY TSEncoding = 1
+ RLE TSEncoding = 2
+ DIFF TSEncoding = 3
+ TS_2DIFF TSEncoding = 4
+ BITMAP TSEncoding = 5
+ GORILLA_V1 TSEncoding = 6
+ REGULAR TSEncoding = 7
+ GORILLA TSEncoding = 8
)
-const (
- UNCOMPRESSED int32 = 0
- SNAPPY int32 = 1
- GZIP int32 = 2
- LZO int32 = 3
- SDT int32 = 4
- PAA int32 = 5
- PLA int32 = 6
- LZ4 int32 = 7
+const (
+ UNCOMPRESSED TSCompressionType = 0
+ SNAPPY TSCompressionType = 1
+ GZIP TSCompressionType = 2
+ LZO TSCompressionType = 3
+ SDT TSCompressionType = 4
+ PAA TSCompressionType = 5
+ PLA TSCompressionType = 6
+ LZ4 TSCompressionType = 7
)
+//TSStatusCode
const (
- SUCCESS_STATUS = 200
- STILL_EXECUTING_STATUS = 201
- INVALID_HANDLE_STATUS =202
- INCOMPATIBLE_VERSION =203
- NODE_DELETE_FAILED_ERROR =298
- ALIAS_ALREADY_EXIST_ERROR =299
- PATH_ALREADY_EXIST_ERROR =300
- PATH_NOT_EXIST_ERROR =301
- UNSUPPORTED_FETCH_METADATA_OPERATION_ERROR =302
- METADATA_ERROR =303
- TIMESERIES_NOT_EXIST =304
- OUT_OF_TTL_ERROR =305
- CONFIG_ADJUSTER =306
- MERGE_ERROR =307
- SYSTEM_CHECK_ERROR =308
- SYNC_DEVICE_OWNER_CONFLICT_ERROR =309
- SYNC_CONNECTION_EXCEPTION =310
- STORAGE_GROUP_PROCESSOR_ERROR =311
- STORAGE_GROUP_ERROR =312
- STORAGE_ENGINE_ERROR =313
- TSFILE_PROCESSOR_ERROR =314
- PATH_ILLEGAL =315
- LOAD_FILE_ERROR =316
- STORAGE_GROUP_NOT_READY =317
+ SuccessStatus int32 = 200
+ StillExecutingStatus int32 = 201
+ InvalidHandleStatus int32 = 202
+ IncompatibleVersion int32 = 203
- EXECUTE_STATEMENT_ERROR =400
- SQL_PARSE_ERROR =401
- GENERATE_TIME_ZONE_ERROR =402
- SET_TIME_ZONE_ERROR =403
- NOT_STORAGE_GROUP_ERROR =404
- QUERY_NOT_ALLOWED =405
- AST_FORMAT_ERROR =406
- LOGICAL_OPERATOR_ERROR =407
- LOGICAL_OPTIMIZE_ERROR =408
- UNSUPPORTED_FILL_TYPE_ERROR =409
- PATH_ERROR =410
- QUERY_PROCESS_ERROR =411
- WRITE_PROCESS_ERROR =412
+ NodeDeleteFailedError int32 = 298
+ AliasAlreadyExistError int32 = 299
+ PathAlreadyExistError int32 = 300
+ PathNotExistError int32 = 301
+ UnsupportedFetchMetadataOperationError int32 = 302
+ MetadataError int32 = 303
+ TimeseriesNotExist int32 = 304
+ OutOfTTLError int32 = 305
+ ConfigAdjuster int32 = 306
+ MergeError int32 = 307
+ SystemCheckError int32 = 308
+ SyncDeviceOwnerConflictError int32 = 309
+ SyncConnectionException int32 = 310
+ StorageGroupProcessorError int32 = 311
+ StorageGroupError int32 = 312
+ StorageEngineError int32 = 313
+ TsfileProcessorError int32 = 314
+ PathIllegal int32 = 315
+ LoadFileError int32 = 316
+ StorageGroupNotReady int32 = 317
- UNSUPPORTED_INDEX_FUNC_ERROR =421
- UNSUPPORTED_INDEX_TYPE_ERROR =422
+ ExecuteStatementError int32 = 400
+ SQLParseError int32 = 401
+ GenerateTimeZoneError int32 = 402
+ SetTimeZoneError int32 = 403
+ NotStorageGroupError int32 = 404
+ QueryNotAllowed int32 = 405
+ AstFormatError int32 = 406
+ LogicalOperatorError int32 = 407
+ LogicalOptimizeError int32 = 408
+ UnsupportedFillTypeError int32 = 409
+ PathErroRint32 = 410
+ QueryProcessError int32 = 411
+ WriteProcessError int32 = 412
+ WriteProcessReject int32 = 413
+ UnsupportedIndexFuncError int32 = 421
+ UnsupportedIndexTypeError int32 = 422
- INTERNAL_SERVER_ERROR =500
- CLOSE_OPERATION_ERROR =501
- READ_ONLY_SYSTEM_ERROR =502
- DISK_SPACE_INSUFFICIENT_ERROR =503
- START_UP_ERROR =504
- SHUT_DOWN_ERROR =505
- MULTIPLE_ERROR =506
+ InternalServerError int32 = 500
+ CloseOperationError int32 = 501
+ ReadOnlySystemError int32 = 502
+ DiskSpaceInsufficientError int32 = 503
+ StartUpError int32 = 504
+ ShutDownError int32 = 505
+ MultipleError int32 = 506
- WRONG_LOGIN_PASSWORD_ERROR =600
- NOT_LOGIN_ERROR =601
- NO_PERMISSION_ERROR =602
- UNINITIALIZED_AUTH_ERROR =603
+ WrongLoginPasswordError int32 = 600
+ NotLoginError int32 = 601
+ NoPermissionError int32 = 602
+ UninitializedAuthError int32 = 603
- // TODO-Cluster: update docs when ready to merge
- PARTITION_NOT_READY =700
- TIME_OUT =701
- NO_LEADER =702
- UNSUPPORTED_OPERATION =703
- NODE_READ_ONLY =704
- CONSISTENCY_FAILURE =705
+ PartitionNotReady int32 = 700
+ TimeOut int32 = 701
+ NoLeader int32 = 702
+ UnsupportedOperation int32 = 703
+ NodeReadOnly int32 = 704
+ ConsistencyFailure int32 = 705
+ NoConnection int32 = 706
+ NeedRedirection int32 = 707
)
diff --git a/client/rowrecord.go b/client/rowrecord.go
index a42b1d2..c951b3f 100644
--- a/client/rowrecord.go
+++ b/client/rowrecord.go
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -20,10 +20,14 @@
package client
type RowRecord struct {
- Timestamp int64
- Fields []Field
+ timestamp int64
+ fields []*Field
}
-func (r *RowRecord) AddField(field Field) {
- r.Fields[len(r.Fields)] = field
+func (r *RowRecord) GetFields() []*Field {
+ return r.fields
+}
+
+func (r *RowRecord) GetTimestamp() int64 {
+ return r.timestamp
}
diff --git a/client/rowrecord_test.go b/client/rowrecord_test.go
new file mode 100644
index 0000000..145633e
--- /dev/null
+++ b/client/rowrecord_test.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestRowRecord_GetFields(t *testing.T) {
+ type fields struct {
+ timestamp int64
+ fields []*Field
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want []*Field
+ }{
+ {
+ name: "GetFields",
+ fields: fields{
+ timestamp: 0,
+ fields: []*Field{&Field{
+ dataType: FLOAT,
+ name: "temperature",
+ value: 0.1,
+ }},
+ },
+ want: []*Field{&Field{
+ dataType: FLOAT,
+ name: "temperature",
+ value: 0.1,
+ }},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ r := &RowRecord{
+ timestamp: tt.fields.timestamp,
+ fields: tt.fields.fields,
+ }
+ if got := r.GetFields(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("RowRecord.GetFields() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestRowRecord_GetTimestamp(t *testing.T) {
+ type fields struct {
+ timestamp int64
+ fields []*Field
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want int64
+ }{
+ {
+ name: "GetTimestamp",
+ fields: fields{
+ timestamp: 1024,
+ },
+ want: 1024,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ r := &RowRecord{
+ timestamp: tt.fields.timestamp,
+ fields: tt.fields.fields,
+ }
+ if got := r.GetTimestamp(); got != tt.want {
+ t.Errorf("RowRecord.GetTimestamp() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/client/rpc_dataset.go b/client/rpc_dataset.go
deleted file mode 100644
index 15c9e58..0000000
--- a/client/rpc_dataset.go
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package client
-
-import (
- "bytes"
- "context"
- "encoding/binary"
- "errors"
- "github.com/apache/iotdb-client-go/rpc"
-)
-
-type IoTDBRpcDataSet struct {
- Sql string
- ColumnNameList []string
- ColumnTypeList []string
- ColumnNameIndex map[string]int32
- QueryId int64
- SessionId int64
- IgnoreTimeStamp bool
- Client *rpc.TSIServiceClient
- QueryDataSet *rpc.TSQueryDataSet
- FetchSize int32
- emptyResultSet bool
- isClosed bool
- time []byte
- rowsIndex int
- currentBitmap []byte
- ColumnTypeDeduplicatedList []string
- Values [][]byte
- HasCachedRecord bool
- ColumnOrdinalMap map[string]int32
- columnSize int
-}
-
-const (
- Flag = 0x80
- StarIndex = 2
- TimeStampStr = "Time"
-)
-
-func (r *IoTDBRpcDataSet) init() {
- r.emptyResultSet = false
- r.isClosed = false
- r.rowsIndex = 0
- r.HasCachedRecord = false
- r.ColumnOrdinalMap = make(map[string]int32)
-}
-
-func NewIoTDBRpcDataSet(dataSet *SessionDataSet, fetchSize int32) *IoTDBRpcDataSet {
- var ioTDBRpcDataSet = IoTDBRpcDataSet{
- Sql: dataSet.Sql,
- QueryId: dataSet.QueryId,
- SessionId: dataSet.SessionId,
- QueryDataSet: dataSet.QueryDataSet,
- IgnoreTimeStamp: dataSet.IgnoreTimeStamp,
- Client: dataSet.Client,
- FetchSize: fetchSize,
- }
- ioTDBRpcDataSet.init()
- ioTDBRpcDataSet.columnSize = len(dataSet.ColumnNameList)
- if !ioTDBRpcDataSet.IgnoreTimeStamp {
- ioTDBRpcDataSet.ColumnNameList = append(ioTDBRpcDataSet.ColumnNameList, TimeStampStr)
- ioTDBRpcDataSet.ColumnTypeList = append(ioTDBRpcDataSet.ColumnTypeList, "INT64")
- ioTDBRpcDataSet.ColumnOrdinalMap[TimeStampStr] = 1
- }
- if dataSet.ColumnNameIndex != nil {
- ioTDBRpcDataSet.ColumnTypeDeduplicatedList = make([]string, len(dataSet.ColumnNameIndex))
- for i := 0; i < len(dataSet.ColumnNameList); i++ {
- name := dataSet.ColumnNameList[i]
- ioTDBRpcDataSet.ColumnNameList = append(ioTDBRpcDataSet.ColumnNameList, name)
- ioTDBRpcDataSet.ColumnTypeList = append(ioTDBRpcDataSet.ColumnTypeList, dataSet.ColumnTypeList[i])
- if _, ok := ioTDBRpcDataSet.ColumnOrdinalMap[name]; !ok {
- index := dataSet.ColumnNameIndex[name]
- ioTDBRpcDataSet.ColumnOrdinalMap[name] = index + StarIndex
- ioTDBRpcDataSet.ColumnTypeDeduplicatedList[index] = dataSet.ColumnTypeList[i]
- }
- }
- } else {
- ioTDBRpcDataSet.ColumnTypeDeduplicatedList = []string{}
- index := StarIndex
- for i := 0; i < len(dataSet.ColumnNameList); i++ {
- name := dataSet.ColumnNameList[i]
- ioTDBRpcDataSet.ColumnNameList = append(ioTDBRpcDataSet.ColumnNameList, name)
- ioTDBRpcDataSet.ColumnTypeList = append(ioTDBRpcDataSet.ColumnTypeList, dataSet.ColumnTypeList[i])
- if _, ok := ioTDBRpcDataSet.ColumnOrdinalMap[name]; !ok {
- ioTDBRpcDataSet.ColumnOrdinalMap[name] = int32(index)
- index++
- ioTDBRpcDataSet.ColumnTypeDeduplicatedList = append(ioTDBRpcDataSet.ColumnTypeDeduplicatedList,
- dataSet.ColumnTypeList[i])
- }
- }
- }
- ioTDBRpcDataSet.time = make([]byte, 8)
- ioTDBRpcDataSet.currentBitmap = make([]byte, len(ioTDBRpcDataSet.ColumnTypeDeduplicatedList))
- ioTDBRpcDataSet.Values = make([][]byte, len(ioTDBRpcDataSet.ColumnTypeDeduplicatedList))
- for i := 0; i < len(ioTDBRpcDataSet.Values); i++ {
- dataType := ioTDBRpcDataSet.ColumnTypeDeduplicatedList[i]
- switch dataType {
- case "BOOLEAN":
- ioTDBRpcDataSet.Values[i] = make([]byte, 1)
- break
- case "INT32", "FLOAT":
- ioTDBRpcDataSet.Values[i] = make([]byte, 4)
- break
- case "INT64", "DOUBLE":
- ioTDBRpcDataSet.Values[i] = make([]byte, 8)
- break
- case "TEXT":
- ioTDBRpcDataSet.Values[i] = nil
- break
- }
- }
- return &ioTDBRpcDataSet
-}
-
-func (r *IoTDBRpcDataSet) hasCachedResults() bool {
- return r.QueryDataSet != nil && len(r.QueryDataSet.Time) != 0
-}
-
-func (r *IoTDBRpcDataSet) getColumnSize() int {
- return r.columnSize
-}
-
-func (r *IoTDBRpcDataSet) constructOneRow() error {
- r.time = r.QueryDataSet.Time[:8]
- r.QueryDataSet.Time = r.QueryDataSet.Time[8:]
- for i := 0; i < len(r.QueryDataSet.BitmapList); i++ {
- bitmapBuffer := r.QueryDataSet.BitmapList[i]
- if r.rowsIndex%8 == 0 {
- r.currentBitmap[i] = bitmapBuffer[0]
- r.QueryDataSet.BitmapList[i] = bitmapBuffer[1:]
- }
- if !r.isNull(int32(i), r.rowsIndex) {
- valueBuffer := r.QueryDataSet.ValueList[i]
- dataType := r.ColumnTypeDeduplicatedList[i]
- switch dataType {
- case "BOOLEAN":
- r.Values[i] = valueBuffer[:1]
- r.QueryDataSet.ValueList[i] = valueBuffer[1:]
- case "INT32", "FLOAT":
- r.Values[i] = valueBuffer[:4]
- r.QueryDataSet.ValueList[i] = valueBuffer[4:]
- case "INT64", "DOUBLE":
- r.Values[i] = valueBuffer[:8]
- r.QueryDataSet.ValueList[i] = valueBuffer[8:]
- case "TEXT":
- buf := bytes.NewBuffer(valueBuffer[:4])
- var tmp uint32
- binary.Read(buf, binary.BigEndian, &tmp)
- length := int(tmp)
- r.Values[i] = valueBuffer[4 : 4+length]
- r.QueryDataSet.ValueList[i] = valueBuffer[4+length:]
- default:
- return errors.New("unsupported data type " + dataType)
- }
- }
- }
- r.rowsIndex++
- r.HasCachedRecord = true
- return nil
-}
-
-func (r *IoTDBRpcDataSet) isNull(index int32, rowNum int) bool {
- bitmap := r.currentBitmap[index]
- shift := rowNum % 8
- return ((Flag >> shift) & (bitmap & 0xff)) == 0
-}
-
-func (r *IoTDBRpcDataSet) isNil(columnIndex int) (bool, error) {
- columnName, err := r.findColumnNameByIndex(columnIndex)
- if err != nil {
- return false, err
- }
- index := r.ColumnOrdinalMap[columnName] - StarIndex
- if index < 0 {
- return true, nil
- }
- return r.isNull(index, r.rowsIndex-1), nil
-}
-
-func (r *IoTDBRpcDataSet) findColumnNameByIndex(columnIndex int) (string, error) {
- if columnIndex <= 0 {
- return "", errors.New("column index should start from 1")
- }
- if columnIndex > len(r.ColumnNameList) {
- return "", errors.New("column index out of range")
- }
- return r.ColumnNameList[columnIndex-1], nil
-}
-
-func (r *IoTDBRpcDataSet) fetchResults() bool {
- r.rowsIndex = 0
- request := rpc.TSFetchResultsReq{
- SessionId: r.SessionId,
- Statement: r.Sql,
- FetchSize: r.FetchSize,
- QueryId: r.QueryId,
- IsAlign: true,
- }
- resp, _ := r.Client.FetchResults(context.Background(), &request)
- if !resp.HasResultSet {
- r.emptyResultSet = true
- } else {
- r.QueryDataSet = resp.GetQueryDataSet()
- }
- return resp.HasResultSet
-}
-
-func (r *IoTDBRpcDataSet) close() {
- if r.isClosed {
- return
- }
- if r.Client != nil {
- closeReq := rpc.TSCloseOperationReq{SessionId: r.SessionId, QueryId: &r.QueryId}
- r.Client.CloseOperation(context.Background(), &closeReq)
- }
- r.Client = nil
- r.isClosed = true
-}
-
-func (r *IoTDBRpcDataSet) next() bool {
- if r.hasCachedResults() {
- r.constructOneRow()
- return true
- }
- if r.emptyResultSet {
- return false
- }
- if r.fetchResults() {
- r.constructOneRow()
- return true
- }
- return false
-}
diff --git a/client/rpcdataset.go b/client/rpcdataset.go
new file mode 100644
index 0000000..599a77d
--- /dev/null
+++ b/client/rpcdataset.go
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "math"
+ "time"
+
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+const (
+ startIndex = 2
+ flag = 0x80
+)
+
+var (
+ errClosed error = errors.New("DataSet is Closed")
+ tsTypeMap map[string]TSDataType = map[string]TSDataType{
+ "BOOLEAN": BOOLEAN,
+ "INT32": INT32,
+ "INT64": INT64,
+ "FLOAT": FLOAT,
+ "DOUBLE": DOUBLE,
+ "TEXT": TEXT,
+ }
+)
+
+type IoTDBRpcDataSet struct {
+ columnCount int
+ sessionId int64
+ queryId int64
+ lastReadWasNull bool
+ rowsIndex int
+ queryDataSet *rpc.TSQueryDataSet
+ sql string
+ fetchSize int32
+ columnNameList []string
+ columnTypeList []TSDataType
+ columnOrdinalMap map[string]int32
+ columnTypeDeduplicatedList []TSDataType
+ currentBitmap []byte
+ time []byte
+ values [][]byte
+ client *rpc.TSIServiceClient
+ emptyResultSet bool
+ ignoreTimeStamp bool
+ closed bool
+}
+
+func (s *IoTDBRpcDataSet) getColumnIndex(columnName string) int32 {
+ if s.closed {
+ return -1
+ }
+ return s.columnOrdinalMap[columnName] - startIndex
+}
+
+func (s *IoTDBRpcDataSet) getColumnType(columnName string) TSDataType {
+ if s.closed {
+ return UNKNOW
+ }
+ return s.columnTypeDeduplicatedList[s.getColumnIndex(columnName)]
+}
+
+func (s *IoTDBRpcDataSet) isNull(columnIndex int, rowIndex int) bool {
+ if s.closed {
+ return true
+ }
+ bitmap := s.currentBitmap[columnIndex]
+ shift := rowIndex % 8
+ return ((flag >> shift) & (bitmap & 0xff)) == 0
+}
+
+func (s *IoTDBRpcDataSet) constructOneRow() error {
+ if s.closed {
+ return errClosed
+ }
+
+ // simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read.
+ s.time = s.queryDataSet.Time[:8]
+ s.queryDataSet.Time = s.queryDataSet.Time[8:]
+
+ for i := 0; i < len(s.queryDataSet.BitmapList); i++ {
+ bitmapBuffer := s.queryDataSet.BitmapList[i]
+ if s.rowsIndex%8 == 0 {
+ s.currentBitmap[i] = bitmapBuffer[0]
+ s.queryDataSet.BitmapList[i] = bitmapBuffer[1:]
+ }
+ if !s.isNull(i, s.rowsIndex) {
+ valueBuffer := s.queryDataSet.ValueList[i]
+ dataType := s.columnTypeDeduplicatedList[i]
+ switch dataType {
+ case BOOLEAN:
+ s.values[i] = valueBuffer[:1]
+ s.queryDataSet.ValueList[i] = valueBuffer[1:]
+ break
+ case INT32:
+ s.values[i] = valueBuffer[:4]
+ s.queryDataSet.ValueList[i] = valueBuffer[4:]
+ break
+ case INT64:
+ s.values[i] = valueBuffer[:8]
+ s.queryDataSet.ValueList[i] = valueBuffer[8:]
+ break
+ case FLOAT:
+ s.values[i] = valueBuffer[:4]
+ s.queryDataSet.ValueList[i] = valueBuffer[4:]
+ break
+ case DOUBLE:
+ s.values[i] = valueBuffer[:8]
+ s.queryDataSet.ValueList[i] = valueBuffer[8:]
+ break
+ case TEXT:
+ length := bytesToInt32(valueBuffer[:4])
+ s.values[i] = valueBuffer[4 : 4+length]
+ s.queryDataSet.ValueList[i] = valueBuffer[4+length:]
+ default:
+ return fmt.Errorf("unsupported data type %d", dataType)
+ }
+ }
+ }
+ s.rowsIndex++
+ return nil
+}
+
+func (s *IoTDBRpcDataSet) GetTimestamp() int64 {
+ if s.closed {
+ return -1
+ }
+ return bytesToInt64(s.time)
+}
+
+func (s *IoTDBRpcDataSet) getText(columnName string) string {
+ if s.closed {
+ return ""
+ }
+ if columnName == TimestampColumnName {
+ return time.Unix(0, bytesToInt64(s.time)*1000000).Format(time.RFC3339)
+ }
+
+ columnIndex := s.getColumnIndex(columnName)
+ if columnIndex < 0 || int(columnIndex) >= len(s.values) || s.isNull(int(columnIndex), s.rowsIndex-1) {
+ s.lastReadWasNull = true
+ return ""
+ }
+ s.lastReadWasNull = false
+ return s.getString(int(columnIndex), s.columnTypeDeduplicatedList[columnIndex])
+}
+
+func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string {
+ if s.closed {
+ return ""
+ }
+ valueBytes := s.values[columnIndex]
+ switch dataType {
+ case BOOLEAN:
+ if valueBytes[0] != 0 {
+ return "true"
+ }
+ return "false"
+ case INT32:
+ return int32ToString(bytesToInt32(valueBytes))
+ case INT64:
+ return int64ToString(bytesToInt64(valueBytes))
+ case FLOAT:
+ bits := binary.BigEndian.Uint32(valueBytes)
+ return float32ToString(math.Float32frombits(bits))
+ case DOUBLE:
+ bits := binary.BigEndian.Uint64(valueBytes)
+ return float64ToString(math.Float64frombits(bits))
+ case TEXT:
+ return string(valueBytes)
+ default:
+ return ""
+ }
+}
+
+func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} {
+ if s.closed {
+ return nil
+ }
+ columnIndex := int(s.getColumnIndex(columnName))
+ if s.isNull(columnIndex, s.rowsIndex-1) {
+ return nil
+ }
+
+ dataType := s.getColumnType(columnName)
+ valueBytes := s.values[columnIndex]
+ switch dataType {
+ case BOOLEAN:
+ return bool(valueBytes[0] != 0)
+ case INT32:
+ return bytesToInt32(valueBytes)
+ case INT64:
+ return bytesToInt64(valueBytes)
+ case FLOAT:
+ bits := binary.BigEndian.Uint32(valueBytes)
+ return math.Float32frombits(bits)
+ case DOUBLE:
+ bits := binary.BigEndian.Uint64(valueBytes)
+ return math.Float64frombits(bits)
+ case TEXT:
+ return string(valueBytes)
+ default:
+ return nil
+ }
+}
+
+func (s *IoTDBRpcDataSet) getRowRecord() (*RowRecord, error) {
+ if s.closed {
+ return nil, errClosed
+ }
+
+ fields := make([]*Field, s.columnCount)
+ for i := 0; i < s.columnCount; i++ {
+ columnName := s.columnNameList[i]
+ field := Field{
+ name: columnName,
+ dataType: s.getColumnType(columnName),
+ value: s.getValue(columnName),
+ }
+ fields[i] = &field
+ }
+ return &RowRecord{
+ timestamp: s.GetTimestamp(),
+ fields: fields,
+ }, nil
+}
+
+func (s *IoTDBRpcDataSet) getBool(columnName string) bool {
+ if s.closed {
+ return false
+ }
+ columnIndex := s.getColumnIndex(columnName)
+ if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ return s.values[columnIndex][0] != 0
+ }
+ s.lastReadWasNull = true
+ return false
+}
+
+func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
+ if s.closed {
+ return errClosed
+ }
+
+ count := s.columnCount
+ if count > len(dest) {
+ count = len(dest)
+ }
+
+ for i := 0; i < count; i++ {
+ columnName := s.columnNameList[i]
+ columnIndex := int(s.getColumnIndex(columnName))
+ if s.isNull(columnIndex, s.rowsIndex-1) {
+ continue
+ }
+
+ dataType := s.getColumnType(columnName)
+ d := dest[i]
+ valueBytes := s.values[columnIndex]
+ switch dataType {
+ case BOOLEAN:
+ switch t := d.(type) {
+ case *bool:
+ *t = bool(valueBytes[0] != 0)
+ case *string:
+ if valueBytes[0] != 0 {
+ *t = "true"
+ } else {
+ *t = "false"
+ }
+ default:
+ return fmt.Errorf("dest[%d] types must be *bool or *string", i)
+ }
+
+ case INT32:
+ switch t := d.(type) {
+ case *int32:
+ *t = bytesToInt32(valueBytes)
+ case *string:
+ *t = int32ToString(bytesToInt32(valueBytes))
+ default:
+ return fmt.Errorf("dest[%d] types must be *int32 or *string", i)
+ }
+ case INT64:
+ switch t := d.(type) {
+ case *int64:
+ *t = bytesToInt64(valueBytes)
+ case *string:
+ *t = int64ToString(bytesToInt64(valueBytes))
+ default:
+ return fmt.Errorf("dest[%d] types must be *int64 or *string", i)
+ }
+ case FLOAT:
+ switch t := d.(type) {
+ case *float32:
+ bits := binary.BigEndian.Uint32(valueBytes)
+ *t = math.Float32frombits(bits)
+ case *string:
+ bits := binary.BigEndian.Uint32(valueBytes)
+ *t = float32ToString(math.Float32frombits(bits))
+ default:
+ return fmt.Errorf("dest[%d] types must be *float32 or *string", i)
+ }
+ case DOUBLE:
+ switch t := d.(type) {
+ case *float64:
+ bits := binary.BigEndian.Uint64(valueBytes)
+ *t = math.Float64frombits(bits)
+ case *string:
+ bits := binary.BigEndian.Uint64(valueBytes)
+ *t = float64ToString(math.Float64frombits(bits))
+ default:
+ return fmt.Errorf("dest[%d] types must be *float64 or *string", i)
+ }
+ case TEXT:
+ switch t := d.(type) {
+ case *string:
+ *t = string(valueBytes)
+ default:
+ return fmt.Errorf("dest[%d] types must be *string", i)
+ }
+ default:
+ return nil
+ }
+ }
+ return nil
+}
+
+func (s *IoTDBRpcDataSet) getFloat(columnName string) float32 {
+ if s.closed {
+ return 0
+ }
+ columnIndex := s.getColumnIndex(columnName)
+ if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ s.lastReadWasNull = false
+ bits := binary.BigEndian.Uint32(s.values[columnIndex])
+ return math.Float32frombits(bits)
+ }
+ s.lastReadWasNull = true
+ return 0
+}
+
+func (s *IoTDBRpcDataSet) getDouble(columnName string) float64 {
+ if s.closed {
+ return 0
+ }
+ columnIndex := s.getColumnIndex(columnName)
+
+ if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ s.lastReadWasNull = false
+ bits := binary.BigEndian.Uint64(s.values[columnIndex])
+ return math.Float64frombits(bits)
+ }
+ s.lastReadWasNull = true
+ return 0
+}
+
+func (s *IoTDBRpcDataSet) getInt32(columnName string) int32 {
+ if s.closed {
+ return 0
+ }
+ columnIndex := s.getColumnIndex(columnName)
+ if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ s.lastReadWasNull = false
+ return bytesToInt32(s.values[columnIndex])
+ }
+
+ s.lastReadWasNull = true
+ return 0
+}
+
+func (s *IoTDBRpcDataSet) getInt64(columnName string) int64 {
+ if s.closed {
+ return 0
+ }
+ if columnName == TimestampColumnName {
+ return bytesToInt64(s.time)
+ }
+
+ columnIndex := s.getColumnIndex(columnName)
+ bys := s.values[columnIndex]
+
+ if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ s.lastReadWasNull = false
+ return bytesToInt64(bys)
+ }
+ s.lastReadWasNull = true
+ return 0
+}
+
+func (s *IoTDBRpcDataSet) hasCachedResults() bool {
+ if s.closed {
+ return false
+ }
+ return (s.queryDataSet != nil && len(s.queryDataSet.Time) > 0)
+}
+
+func (s *IoTDBRpcDataSet) next() (bool, error) {
+ if s.closed {
+ return false, errClosed
+ }
+
+ if s.hasCachedResults() {
+ s.constructOneRow()
+ return true, nil
+ }
+ if s.emptyResultSet {
+ return false, nil
+ }
+
+ r, err := s.fetchResults()
+ if err == nil && r {
+ s.constructOneRow()
+ return true, nil
+ }
+ return false, nil
+}
+
+func (s *IoTDBRpcDataSet) fetchResults() (bool, error) {
+ if s.closed {
+ return false, errClosed
+ }
+ s.rowsIndex = 0
+ req := rpc.TSFetchResultsReq{s.sessionId, s.sql, s.fetchSize, s.queryId, true}
+ resp, err := s.client.FetchResults(context.Background(), &req)
+
+ if err != nil {
+ return false, err
+ }
+
+ if err = VerifySuccess(resp.Status); err != nil {
+ return false, err
+ }
+
+ if !resp.HasResultSet {
+ s.emptyResultSet = true
+ } else {
+ s.queryDataSet = resp.GetQueryDataSet()
+ }
+ return resp.HasResultSet, nil
+}
+
+func (s *IoTDBRpcDataSet) IsClosed() bool {
+ return s.closed
+}
+
+func (s *IoTDBRpcDataSet) Close() (err error) {
+ if s.IsClosed() {
+ return nil
+ }
+ if s.client != nil {
+ closeRequest := &rpc.TSCloseOperationReq{
+ SessionId: s.sessionId,
+ QueryId: &s.queryId,
+ }
+
+ status, err := s.client.CloseOperation(context.Background(), closeRequest)
+ if err == nil {
+ err = VerifySuccess(status)
+ }
+ }
+
+ s.columnCount = 0
+ s.sessionId = -1
+ s.queryId = -1
+ s.rowsIndex = -1
+ s.queryDataSet = nil
+ s.sql = ""
+ s.fetchSize = 0
+ s.columnNameList = nil
+ s.columnTypeList = nil
+ s.columnOrdinalMap = nil
+ s.columnTypeDeduplicatedList = nil
+ s.currentBitmap = nil
+ s.time = nil
+ s.values = nil
+ s.client = nil
+ s.emptyResultSet = true
+ s.closed = true
+ return err
+}
+
+func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypes []string,
+ columnNameIndex map[string]int32,
+ queryId int64, client *rpc.TSIServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet,
+ ignoreTimeStamp bool, fetchSize int32) *IoTDBRpcDataSet {
+
+ ds := &IoTDBRpcDataSet{
+ sql: sql,
+ columnNameList: columnNameList,
+ ignoreTimeStamp: ignoreTimeStamp,
+ queryId: queryId,
+ client: client,
+ sessionId: sessionId,
+ queryDataSet: queryDataSet,
+ fetchSize: fetchSize,
+ currentBitmap: make([]byte, len(columnNameList)),
+ values: make([][]byte, len(columnTypes)),
+ columnCount: len(columnNameList),
+ closed: false,
+ }
+
+ ds.columnTypeList = make([]TSDataType, 0)
+
+ // deduplicate and map
+ ds.columnOrdinalMap = make(map[string]int32)
+ if !ignoreTimeStamp {
+ ds.columnOrdinalMap[TimestampColumnName] = 1
+ }
+
+ if columnNameIndex != nil {
+ ds.columnTypeDeduplicatedList = make([]TSDataType, len(columnNameIndex))
+ for i, name := range columnNameList {
+ columnTypeString := columnTypes[i]
+ columnDataType := tsTypeMap[columnTypeString]
+ ds.columnTypeList = append(ds.columnTypeList, columnDataType)
+ if _, exists := ds.columnOrdinalMap[name]; !exists {
+ index := columnNameIndex[name]
+ ds.columnOrdinalMap[name] = index + startIndex
+ ds.columnTypeDeduplicatedList[index] = tsTypeMap[columnTypeString]
+ }
+ }
+ } else {
+ ds.columnTypeDeduplicatedList = make([]TSDataType, ds.columnCount)
+ index := startIndex
+ for i := 0; i < len(columnNameList); i++ {
+ name := columnNameList[i]
+ dataType := tsTypeMap[columnTypes[i]]
+ ds.columnTypeList = append(ds.columnTypeList, dataType)
+ ds.columnTypeDeduplicatedList[i] = dataType
+ if _, exists := ds.columnOrdinalMap[name]; !exists {
+ ds.columnOrdinalMap[name] = int32(index)
+ index++
+ }
+ }
+ }
+ return ds
+}
diff --git a/client/rpcdataset_test.go b/client/rpcdataset_test.go
new file mode 100644
index 0000000..56a59b1
--- /dev/null
+++ b/client/rpcdataset_test.go
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+func createIoTDBRpcDataSet() *IoTDBRpcDataSet {
+ columns := []string{"root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status"}
+ dataTypes := []string{"INT32", "DOUBLE", "INT64", "FLOAT", "TEXT", "BOOLEAN"}
+ columnNameIndex := map[string]int32{
+ "root.ln.device1.restart_count": 2,
+ "root.ln.device1.price": 1,
+ "root.ln.device1.tick_count": 5,
+ "root.ln.device1.temperature": 4,
+ "root.ln.device1.description": 0,
+ "root.ln.device1.status": 3,
+ }
+ var queyrId int64 = 1
+ var sessionId int64 = 1
+ var client *rpc.TSIServiceClient = nil
+ queryDataSet := rpc.TSQueryDataSet{
+ Time: []byte{0, 0, 1, 118, 76, 52, 0, 236, 0, 0, 1, 118, 76, 52, 25, 228, 0, 0, 1, 118, 76, 52, 41, 42, 0, 0, 1, 118, 76, 52, 243, 148, 0, 0, 1, 118, 76, 95, 98, 255},
+ ValueList: [][]byte{{0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, {64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 20 [...]
+ BitmapList: [][]byte{{248}, {248}, {248}, {248}, {248}, {248}},
+ }
+ return NewIoTDBRpcDataSet("select * from root.ln.device1", columns, dataTypes, columnNameIndex, queyrId, client, sessionId, &queryDataSet, false, DefaultFetchSize)
+}
+
+func TestIoTDBRpcDataSet_getColumnType(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+
+ ds := createIoTDBRpcDataSet()
+ closedDataSet := createIoTDBRpcDataSet()
+ closedDataSet.Close()
+ tests := []struct {
+ name string
+ dataSet *IoTDBRpcDataSet
+ args args
+ want TSDataType
+ }{
+ {
+ name: "Normal",
+ dataSet: ds,
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: INT64,
+ }, {
+ name: "Closed",
+ dataSet: closedDataSet,
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: UNKNOW,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := tt.dataSet
+ if got := s.getColumnType(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getColumnType() = %v, want %v", got, tt.want)
+ }
+ s.Close()
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getColumnIndex(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ closedDataSet := createIoTDBRpcDataSet()
+ closedDataSet.Close()
+ tests := []struct {
+ name string
+ dataset *IoTDBRpcDataSet
+ args args
+ want int32
+ }{
+ {
+ name: "Normal",
+ dataset: createIoTDBRpcDataSet(),
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: 5,
+ }, {
+ name: "Closed",
+ dataset: closedDataSet,
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: -1,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := tt.dataset
+ if got := s.getColumnIndex(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getColumnIndex() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_isNull(t *testing.T) {
+ type args struct {
+ columnIndex int
+ rowIndex int
+ }
+ ds := createIoTDBRpcDataSet()
+ ds.next()
+
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "Normal",
+ args: args{
+ columnIndex: 0,
+ rowIndex: 0,
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.isNull(tt.args.columnIndex, tt.args.rowIndex); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.isNull() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getValue(t *testing.T) {
+
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want interface{}
+ }{
+ {
+ name: "restart_count",
+ args: args{
+ columnName: "root.ln.device1.restart_count",
+ },
+ want: int32(1),
+ }, {
+ name: "tick_count",
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: int64(3333333),
+ }, {
+ name: "price",
+ args: args{
+ columnName: "root.ln.device1.price",
+ },
+ want: float64(1988.2),
+ }, {
+ name: "temperature",
+ args: args{
+ columnName: "root.ln.device1.temperature",
+ },
+ want: float32(12.1),
+ }, {
+ name: "description",
+ args: args{
+ columnName: "root.ln.device1.description",
+ },
+ want: "Test Device 1",
+ }, {
+ name: "status",
+ args: args{
+ columnName: "root.ln.device1.status",
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getValue(tt.args.columnName); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("IoTDBRpcDataSet.getValue() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_scan(t *testing.T) {
+ type args struct {
+ dest []interface{}
+ }
+
+ type want struct {
+ err error
+ values []interface{}
+ }
+
+ var restartCount int32
+ var price float64
+ var tickCount int64
+ var temperature float32
+ var description string
+ var status bool
+
+ var restartCountStr string
+ var priceStr string
+ var tickCountStr string
+ var temperatureStr string
+ var descriptionStr string
+ var statusStr string
+
+ var wantRestartCount int32 = 1
+ var wantPrice float64 = 1988.2
+ var wantTickCount int64 = 3333333
+ var wantTemperature float32 = 12.1
+ var wantDescription string = "Test Device 1"
+ var wantStatus bool = true
+
+ var wantRestartCountStr string = "1"
+ var wantPriceStr string = "1988.2"
+ var wantTickCountStr string = "3333333"
+ var wantTemperatureStr string = "12.1"
+ var wantDescriptionStr string = "Test Device 1"
+ var wantStatusStr string = "true"
+
+ tests := []struct {
+ name string
+ args args
+ want want
+ }{
+ {
+ name: "Normal",
+ args: args{
+ dest: []interface{}{&restartCount, &price, &tickCount, &temperature, &description, &status},
+ },
+ want: want{
+ err: nil,
+ values: []interface{}{&wantRestartCount, &wantPrice, &wantTickCount, &wantTemperature, &wantDescription, &wantStatus},
+ },
+ }, {
+ name: "String",
+ args: args{
+ dest: []interface{}{&restartCountStr, &priceStr, &tickCountStr, &temperatureStr, &descriptionStr, &statusStr},
+ },
+ want: want{
+ err: nil,
+ values: []interface{}{&wantRestartCountStr, &wantPriceStr, &wantTickCountStr, &wantTemperatureStr, &wantDescriptionStr, &wantStatusStr},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if err := s.scan(tt.args.dest...); err != tt.want.err {
+ t.Errorf("IoTDBRpcDataSet.scan() error = %v, wantErr %v", err, tt.want.err)
+ }
+ if got := tt.args.dest; !reflect.DeepEqual(got, tt.want.values) {
+ t.Errorf("IoTDBRpcDataSet.scan(), dest = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_GetTimestamp(t *testing.T) {
+ tests := []struct {
+ name string
+ want int64
+ }{
+ {
+ name: "GetTimestamp",
+ want: 1607596245228,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.GetTimestamp(); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.GetTimestamp() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getText(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "restart_count",
+ args: args{
+ columnName: "root.ln.device1.restart_count",
+ },
+ want: "1",
+ }, {
+ name: "price",
+ args: args{
+ columnName: "root.ln.device1.price",
+ },
+ want: "1988.2",
+ }, {
+ name: "tick_count",
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: "3333333",
+ }, {
+ name: "temperature",
+ args: args{
+ columnName: "root.ln.device1.temperature",
+ },
+ want: "12.1",
+ }, {
+ name: "description",
+ args: args{
+ columnName: "root.ln.device1.description",
+ },
+ want: "Test Device 1",
+ }, {
+ name: "status",
+ args: args{
+ columnName: "root.ln.device1.status",
+ },
+ want: "true",
+ }, {
+ name: TimestampColumnName,
+ args: args{
+ columnName: TimestampColumnName,
+ },
+ want: "2020-12-10T18:30:45+08:00",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getText(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getText() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getBool(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "status",
+ args: args{
+ columnName: "root.ln.device1.status",
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getBool(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getBool() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getFloat(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want float32
+ }{
+ {
+ name: "temperature",
+ args: args{
+ columnName: "root.ln.device1.temperature",
+ },
+ want: 12.1,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getFloat(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getFloat() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getDouble(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want float64
+ }{
+ {
+ name: "price",
+ args: args{
+ columnName: "root.ln.device1.price",
+ },
+ want: 1988.2,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getDouble(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getDouble() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getInt32(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want int32
+ }{
+ {
+ name: "restart_count",
+ args: args{
+ columnName: "root.ln.device1.restart_count",
+ },
+ want: 1,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getInt32(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getInt32() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getInt64(t *testing.T) {
+ type args struct {
+ columnName string
+ }
+ tests := []struct {
+ name string
+ args args
+ want int64
+ }{
+ {
+ name: "tick_count",
+ args: args{
+ columnName: "root.ln.device1.tick_count",
+ },
+ want: 3333333,
+ }, {
+ name: TimestampColumnName,
+ args: args{
+ columnName: TimestampColumnName,
+ },
+ want: 1607596245228,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if got := s.getInt64(tt.args.columnName); got != tt.want {
+ t.Errorf("IoTDBRpcDataSet.getInt64() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_getRowRecord(t *testing.T) {
+ tests := []struct {
+ name string
+ want *RowRecord
+ wantErr bool
+ }{
+ {
+ name: "",
+ want: &RowRecord{
+ timestamp: 0,
+ fields: []*Field{
+ &Field{
+ name: "root.ln.device1.restart_count",
+ dataType: INT32,
+ value: int32(1),
+ }, &Field{
+ name: "root.ln.device1.price",
+ dataType: DOUBLE,
+ value: float64(1988.2),
+ }, &Field{
+ name: "root.ln.device1.tick_count",
+ dataType: INT64,
+ value: int64(3333333),
+ }, &Field{
+ name: "root.ln.device1.temperature",
+ dataType: FLOAT,
+ value: float32(12.1),
+ }, &Field{
+ name: "root.ln.device1.description",
+ dataType: TEXT,
+ value: string("Test Device 1"),
+ }, &Field{
+ name: "root.ln.device1.status",
+ dataType: BOOLEAN,
+ value: bool(true),
+ },
+ },
+ },
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ got, err := s.getRowRecord()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("IoTDBRpcDataSet.getRowRecord() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ match := true
+ for i := 0; i < len(got.fields); i++ {
+ gotField := got.fields[i]
+ wantField := tt.want.fields[i]
+
+ if gotField.dataType != wantField.dataType || gotField.name != wantField.name || gotField.value != wantField.value {
+ match = false
+
+ }
+ }
+ if !match {
+ t.Errorf("IoTDBRpcDataSet.getRowRecord() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestIoTDBRpcDataSet_Close(t *testing.T) {
+
+ tests := []struct {
+ name string
+ wantErr bool
+ }{
+ {
+ name: "",
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := createIoTDBRpcDataSet()
+ s.next()
+ if err := s.Close(); (err != nil) != tt.wantErr {
+ t.Errorf("IoTDBRpcDataSet.Close() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
diff --git a/client/session.go b/client/session.go
index 8172d25..a8db017 100644
--- a/client/session.go
+++ b/client/session.go
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -24,18 +24,48 @@ import (
"context"
"encoding/binary"
"errors"
- "github.com/apache/iotdb-client-go/rpc"
- "github.com/apache/thrift/lib/go/thrift"
+ "fmt"
"net"
- "strconv"
+ "reflect"
"time"
+
+ "github.com/apache/iotdb-client-go/rpc"
+ "github.com/apache/thrift/lib/go/thrift"
)
-const protocolVersion = rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3
+const (
+ DefaultTimeZone = "Asia/Shanghai"
+ DefaultFetchSize = 1024
+)
var lengthError = errors.New("deviceIds, times, measurementsList and valuesList's size should be equal")
+type Config struct {
+ Host string
+ Port string
+ UserName string
+ Password string
+ FetchSize int32
+ TimeZone string
+}
+
+type Session struct {
+ config *Config
+ client *rpc.TSIServiceClient
+ sessionId int64
+ isClose bool
+ trans thrift.TTransport
+ requestStatementId int64
+}
+
func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error {
+ if s.config.FetchSize <= 0 {
+ s.config.FetchSize = DefaultFetchSize
+ }
+ if s.config.TimeZone == "" {
+ s.config.TimeZone = DefaultTimeZone
+ }
+
var protocolFactory thrift.TProtocolFactory
var err error
s.trans, err = thrift.NewTSocketTimeout(net.JoinHostPort(s.config.Host, s.config.Port), time.Duration(connectionTimeoutInMs))
@@ -54,326 +84,228 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
} else {
protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
- iProtocol := protocolFactory.GetProtocol(s.trans)
- oProtocol := protocolFactory.GetProtocol(s.trans)
- s.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iProtocol, oProtocol))
- tSOpenSessionReq := rpc.TSOpenSessionReq{
- ClientProtocol: protocolVersion,
- ZoneId: s.config.ZoneId,
- Username: &s.config.User,
- Password: &s.config.Passwd,
- }
- tSOpenSessionResp, err := s.client.OpenSession(context.Background(), &tSOpenSessionReq)
+ iprot := protocolFactory.GetProtocol(s.trans)
+ oprot := protocolFactory.GetProtocol(s.trans)
+ s.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
+ req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: &s.config.UserName,
+ Password: &s.config.Password}
+ resp, err := s.client.OpenSession(context.Background(), &req)
if err != nil {
return err
}
- s.sessionId = tSOpenSessionResp.GetSessionId()
+ s.sessionId = resp.GetSessionId()
s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
if err != nil {
return err
}
- s.SetTimeZone(s.config.ZoneId)
- return err
-}
-func (s *Session) CheckTimeseriesExists(path string) bool {
- dataSet, _ := s.ExecuteQueryStatement("SHOW TIMESERIES " + path)
- result := dataSet.HasNext()
- dataSet.CloseOperationHandle()
- return result
+ s.SetTimeZone(s.config.TimeZone)
+ s.config.TimeZone, err = s.GetTimeZone()
+ return err
}
-func (s *Session) Close() error {
- tSCloseSessionReq := rpc.NewTSCloseSessionReq()
- tSCloseSessionReq.SessionId = s.sessionId
- status, err := s.client.CloseSession(context.Background(), tSCloseSessionReq)
- s.trans.Close()
+func (s *Session) Close() (r *rpc.TSStatus, err error) {
+ req := rpc.NewTSCloseSessionReq()
+ req.SessionId = s.sessionId
+ r, err = s.client.CloseSession(context.Background(), req)
if err != nil {
- return err
+ return nil, err
}
- err = verifySuccess(status)
- return err
+ return nil, s.trans.Close()
}
/*
*set one storage group
- *
*param
*storageGroupId: string, storage group name (starts from root)
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) SetStorageGroup(storageGroupId string) error {
- status, err := s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) {
+ r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
+ return r, err
}
/*
*delete one storage group
- *
*param
*storageGroupId: string, storage group name (starts from root)
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) DeleteStorageGroup(storageGroupId string) error {
- status, err := s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) {
+ r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
+ return r, err
}
/*
*delete multiple storage group
- *
*param
*storageGroupIds: []string, paths of the target storage groups
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) DeleteStorageGroups(storageGroupIds []string) error {
- status, err := s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *rpc.TSStatus, err error) {
+ r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
+ return r, err
}
/*
*create single time series
- *
*params
*path: string, complete time series path (starts from root)
*dataType: int32, data type for this time series
*encoding: int32, data type for this time series
*compressor: int32, compressing type for this time series
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) CreateTimeseries(path string, dataType int32, encoding int32, compressor int32) error {
- request := rpc.TSCreateTimeseriesReq{
- SessionId: s.sessionId,
- Path: path, DataType: dataType,
- Encoding: encoding,
- Compressor: compressor,
- }
+func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r *rpc.TSStatus, err error) {
+ request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding),
+ Compressor: int32(compressor), Attributes: attributes, Tags: tags}
status, err := s.client.CreateTimeseries(context.Background(), &request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+ return status, err
}
/*
*create multiple time series
- *
*params
*paths: []string, complete time series paths (starts from root)
*dataTypes: []int32, data types for time series
*encodings: []int32, encodings for time series
*compressors: []int32, compressing types for time series
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []int32, encodings []int32, compressors []int32) error {
- request := rpc.TSCreateMultiTimeseriesReq{
- SessionId: s.sessionId,
- Paths: paths,
- DataTypes: dataTypes,
- Encodings: encodings,
- Compressors: compressors,
+func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r *rpc.TSStatus, err error) {
+ destTypes := make([]int32, len(dataTypes))
+ for i, t := range dataTypes {
+ destTypes[i] = int32(t)
}
- status, err := s.client.CreateMultiTimeseries(context.Background(), &request)
- if err != nil {
- return err
+
+ destEncodings := make([]int32, len(encodings))
+ for i, e := range encodings {
+ destEncodings[i] = int32(e)
}
- err = verifySuccess(status)
- return err
+
+ destCompressions := make([]int32, len(compressors))
+ for i, e := range compressors {
+ destCompressions[i] = int32(e)
+ }
+
+ request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: paths, DataTypes: destTypes,
+ Encodings: destEncodings, Compressors: destCompressions}
+ r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+
+ return r, err
}
/*
*delete multiple time series, including data and schema
- *
*params
*paths: []string, time series paths, which should be complete (starts from root)
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) DeleteTimeseries(paths []string) error {
- status, err := s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err error) {
+ r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+ return r, err
}
/*
*delete all startTime <= data <= endTime in multiple time series
- *
*params
*paths: []string, time series array that the data in
*startTime: int64, start time of deletion range
*endTime: int64, end time of deletion range
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) error {
- request := rpc.TSDeleteDataReq{SessionId: s.sessionId,
- Paths: paths,
- StartTime: startTime,
- EndTime: endTime,
- }
- status, err := s.client.DeleteData(context.Background(), &request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *rpc.TSStatus, err error) {
+ request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime}
+ r, err = s.client.DeleteData(context.Background(), &request)
+ return r, err
}
/*
*special case for inserting one row of String (TEXT) value
- *
*params
*deviceId: string, time series path for device
*measurements: []string, sensor names
*values: []string, values to be inserted, for each sensor
*timestamp: int64, indicate the timestamp of the row of data
- *
+ *return
+ *error: correctness of operation
*/
-func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) error {
- request := rpc.TSInsertStringRecordReq{
- SessionId: s.sessionId,
- DeviceId: deviceId,
- Measurements: measurements,
- Values: values,
- Timestamp: timestamp,
- }
- status, err := s.client.InsertStringRecord(context.Background(), &request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r *rpc.TSStatus, err error) {
+ request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, DeviceId: deviceId, Measurements: measurements,
+ Values: values, Timestamp: timestamp}
+ r, err = s.client.InsertStringRecord(context.Background(), &request)
+ return r, err
}
-/**
- * This method NOT insert data into database and the server just return after accept the request,
- * this method should be used to test other time cost in iotdb
- */
-func (s *Session) TestInsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) error {
- request := rpc.TSInsertStringRecordReq{
- SessionId: s.sessionId,
- DeviceId: deviceId,
- Measurements: measurements,
- Values: values,
- Timestamp: timestamp,
- }
- status, err := s.client.TestInsertStringRecord(context.Background(), &request)
- if err != nil {
- return err
+func (s *Session) GetTimeZone() (string, error) {
+ if s.config.TimeZone != "" {
+ return s.config.TimeZone, nil
+ } else {
+ resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
+ if err != nil {
+ return "", err
+ }
+ return resp.TimeZone, nil
}
- err = verifySuccess(status)
- return err
}
-/*
- *special case for inserting multiple rows of String (TEXT) value
- *
- *params
- *deviceIds: []string, time series paths for device
- *measurements: [][]string, each element of outer list indicates measurements of a device
- *values: [][]interface{}, values to be inserted, for each device
- *timestamps: []int64, timestamps for records
- *
- */
-func (s *Session) InsertStringRecords(deviceIds []string, measurements [][]string, values [][]string,
- timestamps []int64) error {
- request := rpc.TSInsertStringRecordsReq{
- SessionId: s.sessionId,
- DeviceIds: deviceIds,
- MeasurementsList: measurements,
- ValuesList: values,
- Timestamps: timestamps,
- }
- status, err := s.client.InsertStringRecords(context.Background(), &request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) SetTimeZone(timeZone string) (r *rpc.TSStatus, err error) {
+ request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone}
+ r, err = s.client.SetTimeZone(context.Background(), &request)
+ s.config.TimeZone = timeZone
+ return r, err
}
-/**
- * This method NOT insert data into database and the server just return after accept the request,
- * this method should be used to test other time cost in iotdb
- */
-func (s *Session) TestInsertStringRecords(deviceIds []string, measurements [][]string, values [][]string,
- timestamps []int64) error {
- request := rpc.TSInsertStringRecordsReq{
- SessionId: s.sessionId,
- DeviceIds: deviceIds,
- MeasurementsList: measurements,
- ValuesList: values,
- Timestamps: timestamps,
- }
- status, err := s.client.TestInsertStringRecords(context.Background(), &request)
- if err != nil {
- return err
+func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
+ request := rpc.TSExecuteStatementReq{
+ SessionId: s.sessionId,
+ Statement: sql,
+ StatementId: s.requestStatementId,
+ FetchSize: &s.config.FetchSize,
}
- err = verifySuccess(status)
- return err
+ resp, err := s.client.ExecuteStatement(context.Background(), &request)
+ return s.genDataSet(sql, resp), err
}
-/*
- *insert one row of record into database, if you want improve your performance, please use insertTablet method
- *
- *params
- *deviceId: string, time series path for device
- *measurements: []string, sensor names
- *dataTypes: []int32, list of dataType, indicate the data type for each sensor
- *values: []interface{}, values to be inserted, for each sensor
- *timestamp: int64, indicate the timestamp of the row of data
- *
- */
-func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []int32, values []interface{},
- timestamp int64) error {
- request := s.genInsertRecordReq(deviceId, measurements, dataTypes, values, timestamp)
- status, err := s.client.InsertRecord(context.Background(), request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) ExecuteQueryStatement(sql string) (*SessionDataSet, error) {
+ request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId,
+ FetchSize: &s.config.FetchSize}
+ resp, err := s.client.ExecuteQueryStatement(context.Background(), &request)
+ return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize), err
}
-/**
- * This method NOT insert data into database and the server just return after accept the request,
- * this method should be used to test other time cost in iotdb
- */
-func (s *Session) TestInsertRecord(deviceId string, measurements []string, dataTypes []int32, values []interface{},
- timestamp int64) error {
- request := s.genInsertRecordReq(deviceId, measurements, dataTypes, values, timestamp)
- status, err := s.client.TestInsertRecord(context.Background(), request)
- if err != nil {
- return err
+func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
+ measurements []string,
+ types []TSDataType,
+ values []interface{}) (*rpc.TSInsertRecordReq, error) {
+ request := &rpc.TSInsertRecordReq{}
+ request.SessionId = s.sessionId
+ request.DeviceId = deviceId
+ request.Timestamp = time
+ request.Measurements = measurements
+
+ if bys, err := valuesToBytes(types, values); err == nil {
+ request.Values = bys
+ } else {
+ return nil, err
}
- err = verifySuccess(status)
- return err
+ return request, nil
}
-func (s *Session) genInsertRecordReq(deviceId string, measurements []string, dataTypes []int32, values []interface{},
- timestamp int64) *rpc.TSInsertRecordReq {
- request := rpc.TSInsertRecordReq{
- SessionId: s.sessionId,
- DeviceId: deviceId,
- Measurements: measurements,
- Timestamp: timestamp,
+func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
+ request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values)
+ if err != nil {
+ return nil, err
}
- request.Values = valuesToBytes(dataTypes, values)
- return &request
+ r, err = s.client.InsertRecord(context.Background(), request)
+ return r, err
}
/*
@@ -388,145 +320,68 @@ func (s *Session) genInsertRecordReq(deviceId string, measurements []string, dat
*timestamps: []int64, timestamps for records
*
*/
-func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]int32, values [][]interface{},
- timestamps []int64) error {
+func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
+ timestamps []int64) (r *rpc.TSStatus, err error) {
request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps)
if err != nil {
- return err
+ return nil, err
} else {
- status, err := s.client.InsertRecords(context.Background(), request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+ return s.client.InsertRecords(context.Background(), request)
}
-
-}
-
-/**
- * This method NOT insert data into database and the server just return after accept the request,
- * this method should be used to test other time cost in iotdb
- */
-func (s *Session) TestInsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]int32, values [][]interface{},
- timestamps []int64) error {
- request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps)
- if err != nil {
- return err
- } else {
- status, err := s.client.TestInsertRecords(context.Background(), request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
- }
-}
-
-func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]int32, values [][]interface{},
- timestamps []int64) (*rpc.TSInsertRecordsReq, error) {
- length := len(deviceIds)
- if length != len(timestamps) || length != len(measurements) || length != len(values) {
- return nil, lengthError
- }
- request := rpc.TSInsertRecordsReq{
- SessionId: s.sessionId,
- DeviceIds: deviceIds,
- MeasurementsList: measurements,
- Timestamps: timestamps,
- }
- v := make([][]byte, length)
- for i := 0; i < len(measurements); i++ {
- v[i] = valuesToBytes(dataTypes[i], values[i])
- }
- request.ValuesList = v
- return &request, nil
}
/*
- *insert one tablet, in a tablet, for each timestamp, the number of measurements is same
+ *insert multiple tablets, tablets are independent to each other
*
*params
- *tablet: utils.Tablet, a tablet specified above
+ *tablets: []utils.Tablet, list of tablets
*
*/
-func (s *Session) InsertTablet(tablet Tablet) error {
- tablet.SortTablet()
- request := s.genInsertTabletReq(tablet)
- status, err := s.client.InsertTablet(context.Background(), request)
+func (s *Session) InsertTablets(tablets []*Tablet) (r *rpc.TSStatus, err error) {
+ request, err := s.genInsertTabletsReq(tablets)
if err != nil {
- return err
+ return nil, err
}
- err = verifySuccess(status)
- return err
+ return s.client.InsertTablets(context.Background(), request)
}
-/**
- * This method NOT insert data into database and the server just return after accept the request,
- * this method should be used to test other time cost in iotdb
- */
-func (s *Session) TestInsertTablet(tablet Tablet) error {
- tablet.SortTablet()
- request := s.genInsertTabletReq(tablet)
- status, err := s.client.TestInsertTablet(context.Background(), request)
- if err != nil {
- return err
+func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, err error) {
+ request := rpc.TSExecuteBatchStatementReq{
+ SessionId: s.sessionId,
+ Statements: inserts,
}
- err = verifySuccess(status)
- return err
+ return s.client.ExecuteBatchStatement(context.Background(), &request)
}
-func (s *Session) genInsertTabletReq(tablet Tablet) *rpc.TSInsertTabletReq {
- request := rpc.TSInsertTabletReq{
- SessionId: s.sessionId,
- DeviceId: tablet.GetDeviceId(),
- Types: tablet.GetTypes(),
- Measurements: tablet.Measurements,
- Values: tablet.GetBinaryValues(),
- Timestamps: tablet.GetBinaryTimestamps(),
- Size: tablet.GetRowNumber(),
+func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) {
+ request := rpc.TSRawDataQueryReq{
+ SessionId: s.sessionId,
+ Paths: paths,
+ FetchSize: &s.config.FetchSize,
+ StartTime: startTime,
+ EndTime: endTime,
+ StatementId: s.requestStatementId,
}
- return &request
+ resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request)
+ return s.genDataSet("", resp), err
}
-/*
- *insert multiple tablets, tablets are independent to each other
- *
- *params
- *tablets: []utils.Tablet, list of tablets
- *
- */
-func (s *Session) InsertTablets(tablets []Tablet) error {
- for index := range tablets {
- tablets[index].SortTablet()
- }
- request := s.genInsertTabletsReq(tablets)
- status, err := s.client.InsertTablets(context.Background(), request)
- if err != nil {
- return err
+func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
+ request := rpc.TSExecuteStatementReq{
+ SessionId: s.sessionId,
+ Statement: sql,
+ StatementId: s.requestStatementId,
+ FetchSize: &s.config.FetchSize,
}
- err = verifySuccess(status)
- return err
+ resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request)
+ return s.genDataSet(sql, resp), err
}
-/**
- * This method NOT insert data into database and the server just return after accept the request,
- * this method should be used to test other time cost in iotdb
- */
-func (s *Session) TestInsertTablets(tablets []Tablet) error {
- for index := range tablets {
- tablets[index].SortTablet()
- }
- request := s.genInsertTabletsReq(tablets)
- status, err := s.client.TestInsertTablets(context.Background(), request)
- if err != nil {
- return err
- }
- err = verifySuccess(status)
- return err
+func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *SessionDataSet {
+ return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize)
}
-func (s *Session) genInsertTabletsReq(tablets []Tablet) *rpc.TSInsertTabletsReq {
+func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsReq, error) {
var (
length = len(tablets)
deviceIds = make([]string, length)
@@ -537,164 +392,146 @@ func (s *Session) genInsertTabletsReq(tablets []Tablet) *rpc.TSInsertTabletsReq
sizeList = make([]int32, length)
)
for index, tablet := range tablets {
- deviceIds[index] = tablet.GetDeviceId()
+ deviceIds[index] = tablet.deviceId
measurementsList[index] = tablet.GetMeasurements()
- valuesList[index] = tablet.GetBinaryValues()
- timestampsList[index] = tablet.GetBinaryTimestamps()
- typesList[index] = tablet.GetTypes()
- sizeList[index] = tablet.GetRowNumber()
+
+ values, err := tablet.GetValuesBytes()
+ if err != nil {
+ return nil, err
+ }
+
+ valuesList[index] = values
+ timestampsList[index] = tablet.GetTimestampBytes()
+ typesList[index] = tablet.getDataTypes()
+ sizeList[index] = int32(tablet.RowSize)
}
request := rpc.TSInsertTabletsReq{
- SessionId: s.sessionId,
- DeviceIds: deviceIds,
- TypesList: typesList,
+ SessionId: s.sessionId,
+ DeviceIds: deviceIds,
+ TypesList: typesList,
MeasurementsList: measurementsList,
- ValuesList: valuesList,
- TimestampsList: timestampsList,
- SizeList: sizeList,
+ ValuesList: valuesList,
+ TimestampsList: timestampsList,
+ SizeList: sizeList,
}
- return &request
+ return &request, nil
}
-func valuesToBytes(dataTypes []int32, values []interface{}) []byte {
- buf := bytes.NewBuffer([]byte{})
- for i := 0; i < len(dataTypes); i++ {
- dataType := int16(dataTypes[i])
- binary.Write(buf, binary.BigEndian, dataType)
- switch dataTypes[i] {
- case 0, 1, 2, 3, 4:
- binary.Write(buf, binary.BigEndian, values[i])
- break
- case 5:
- tmp := (int32)(len(values[i].(string)))
- binary.Write(buf, binary.BigEndian, tmp)
- buf.WriteString(values[i].(string))
- break
- }
+func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
+ timestamps []int64) (*rpc.TSInsertRecordsReq, error) {
+ length := len(deviceIds)
+ if length != len(timestamps) || length != len(measurements) || length != len(values) {
+ return nil, lengthError
}
- return buf.Bytes()
-}
-
-func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
- request := rpc.TSExecuteStatementReq{
- SessionId: s.sessionId,
- Statement: sql,
- StatementId: s.requestStatementId,
- FetchSize: &s.config.FetchSize,
+ request := rpc.TSInsertRecordsReq{
+ SessionId: s.sessionId,
+ DeviceIds: deviceIds,
+ MeasurementsList: measurements,
+ Timestamps: timestamps,
}
- resp, err := s.client.ExecuteStatement(context.Background(), &request)
- dataSet := s.genDataSet(sql, resp)
- sessionDataSet := NewSessionDataSet(dataSet)
- return sessionDataSet, err
-}
-
-func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *SessionDataSet {
- dataSet := SessionDataSet{
- Sql: sql,
- ColumnNameList: resp.GetColumns(),
- ColumnTypeList: resp.GetDataTypeList(),
- ColumnNameIndex: resp.GetColumnNameIndexMap(),
- QueryId: resp.GetQueryId(),
- SessionId: s.sessionId,
- IgnoreTimeStamp: resp.GetIgnoreTimeStamp(),
- Client: s.client,
- QueryDataSet: resp.GetQueryDataSet(),
+ v := make([][]byte, length)
+ for i := 0; i < len(measurements); i++ {
+ if bys, err := valuesToBytes(dataTypes[i], values[i]); err == nil {
+ v[i] = bys
+ } else {
+ return nil, err
+ }
}
- return &dataSet
+ request.ValuesList = v
+ return &request, nil
}
-func (s *Session) ExecuteQueryStatement(sql string) (*SessionDataSet, error) {
- request := rpc.TSExecuteStatementReq{
- SessionId: s.sessionId,
- Statement: sql,
- StatementId: s.requestStatementId,
- FetchSize: &s.config.FetchSize,
- }
- resp, err := s.client.ExecuteQueryStatement(context.Background(), &request)
- dataSet := s.genDataSet(sql, resp)
- sessionDataSet := NewSessionDataSet(dataSet)
- return sessionDataSet, err
-}
+func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) {
+ buff := &bytes.Buffer{}
+ for i, t := range dataTypes {
+ binary.Write(buff, binary.BigEndian, int16(t))
+ v := values[i]
+ if v == nil {
+ return nil, fmt.Errorf("values[%d] can't be nil", i)
+ }
-func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
- request := rpc.TSExecuteStatementReq{
- SessionId: s.sessionId,
- Statement: sql,
- StatementId: s.requestStatementId,
- FetchSize: &s.config.FetchSize,
+ switch t {
+ case BOOLEAN:
+ switch v.(type) {
+ case bool:
+ binary.Write(buff, binary.BigEndian, v)
+ default:
+ return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, reflect.TypeOf(v))
+ }
+ case INT32:
+ switch v.(type) {
+ case int32:
+ binary.Write(buff, binary.BigEndian, v)
+ default:
+ return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v))
+ }
+ case INT64:
+ switch v.(type) {
+ case int64:
+ binary.Write(buff, binary.BigEndian, v)
+ default:
+ return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, v, reflect.TypeOf(v))
+ }
+ case FLOAT:
+ switch v.(type) {
+ case float32:
+ binary.Write(buff, binary.BigEndian, v)
+ default:
+ return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, v, reflect.TypeOf(v))
+ }
+ case DOUBLE:
+ switch v.(type) {
+ case float64:
+ binary.Write(buff, binary.BigEndian, v)
+ default:
+ return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v))
+ }
+ case TEXT:
+ switch v.(type) {
+ case string:
+ text := v.(string)
+ size := len(text)
+ binary.Write(buff, binary.BigEndian, int32(size))
+ binary.Write(buff, binary.BigEndian, []byte(text))
+ default:
+ return nil, fmt.Errorf("values[%d] %v(%v) must be string", i, v, reflect.TypeOf(v))
+ }
+ default:
+ return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT)", i)
+ }
}
- resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request)
- dataSet := s.genDataSet(sql, resp)
- sessionDataSet := NewSessionDataSet(dataSet)
- return sessionDataSet, err
+ return buff.Bytes(), nil
}
-func (s *Session) ExecuteBatchStatement(inserts []string) error {
- request := rpc.TSExecuteBatchStatementReq{
- SessionId: s.sessionId,
- Statements: inserts,
- }
- status, err := s.client.ExecuteBatchStatement(context.Background(), &request)
+func (s *Session) InsertTablet(tablet *Tablet) (r *rpc.TSStatus, err error) {
+ request, err := s.genTSInsertTabletReq(tablet)
if err != nil {
- return err
+ return nil, err
}
- err = verifySuccess(status)
- return err
+ return s.client.InsertTablet(context.Background(), request)
}
-func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) {
- request := rpc.TSRawDataQueryReq{
- SessionId: s.sessionId,
- Paths: paths,
- FetchSize: &s.config.FetchSize,
- StartTime: startTime,
- EndTime: endTime,
- StatementId: s.requestStatementId,
- }
- resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request)
- dataSet := s.genDataSet("", resp)
- sessionDataSet := NewSessionDataSet(dataSet)
- return sessionDataSet, err
-}
-
-func (s *Session) GetTimeZone() (string, error) {
- if s.config.ZoneId != "" {
- return s.config.ZoneId, nil
- } else {
- resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
- return resp.TimeZone, err
- }
-}
-
-func (s *Session) SetTimeZone(timeZone string) error {
- request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone}
- status, err := s.client.SetTimeZone(context.Background(), &request)
+func (s *Session) genTSInsertTabletReq(tablet *Tablet) (request *rpc.TSInsertTabletReq, err error) {
+ values, err := tablet.GetValuesBytes()
if err != nil {
- return err
+ return nil, err
}
- err = verifySuccess(status)
- if err != nil {
- return err
+ request = &rpc.TSInsertTabletReq{
+ SessionId: s.sessionId,
+ DeviceId: tablet.deviceId,
+ Measurements: tablet.GetMeasurements(),
+ Values: values,
+ Timestamps: tablet.GetTimestampBytes(),
+ Types: tablet.getDataTypes(),
+ Size: int32(tablet.RowSize),
}
- s.config.ZoneId = timeZone
- return nil
+ return request, nil
}
-func verifySuccess(status *rpc.TSStatus) error {
- if status.GetCode() == MULTIPLE_ERROR {
- return VerifySuccess(status.GetSubStatus())
- }
- if status.GetCode() != SUCCESS_STATUS {
- return errors.New(strconv.Itoa(int(status.GetCode())) + ": " + status.GetMessage())
- }
- return nil
+func (s *Session) GetSessionId() int64 {
+ return s.sessionId
}
-func VerifySuccess(statuses []*rpc.TSStatus) error {
- for _, status := range statuses {
- if status.GetCode() != SUCCESS_STATUS {
- return errors.New(strconv.Itoa(int(status.GetCode())) + ": " + status.GetMessage())
- }
- }
- return nil
+func NewSession(config *Config) *Session {
+ return &Session{config: config}
}
diff --git a/client/session_dataset.go b/client/session_dataset.go
deleted file mode 100644
index f075b8c..0000000
--- a/client/session_dataset.go
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package client
-
-import (
- "bytes"
- "encoding/binary"
- "errors"
- "github.com/apache/iotdb-client-go/rpc"
-)
-
-type SessionDataSet struct {
- Sql string
- ColumnNameList []string
- ColumnTypeList []string
- ColumnNameIndex map[string]int32
- QueryId int64
- SessionId int64
- IgnoreTimeStamp bool
- Client *rpc.TSIServiceClient
- QueryDataSet *rpc.TSQueryDataSet
- ioTDBRpcDataSet *IoTDBRpcDataSet
-}
-
-func NewSessionDataSet(dataSet *SessionDataSet) *SessionDataSet {
- var sessionDataSet = SessionDataSet{
- Sql: dataSet.Sql,
- ColumnNameList: dataSet.ColumnNameList,
- ColumnTypeList: dataSet.ColumnTypeList,
- ColumnNameIndex: dataSet.ColumnNameIndex,
- QueryId: dataSet.QueryId,
- SessionId: dataSet.SessionId,
- IgnoreTimeStamp: dataSet.IgnoreTimeStamp,
- Client: dataSet.Client,
- QueryDataSet: dataSet.QueryDataSet,
- }
- sessionDataSet.ioTDBRpcDataSet = NewIoTDBRpcDataSet(&sessionDataSet, DefaultFetchSize)
- return &sessionDataSet
-}
-
-func (s *SessionDataSet) getFetchSize() int32 {
-
- return s.ioTDBRpcDataSet.FetchSize
-}
-
-func (s *SessionDataSet) setFetchSize(fetchSize int32) {
- s.ioTDBRpcDataSet.FetchSize = fetchSize
-}
-
-func (s *SessionDataSet) GetColumnNames() []string {
- return s.ioTDBRpcDataSet.ColumnNameList
-}
-
-func (s *SessionDataSet) GetColumnTypes() []string {
- return s.ioTDBRpcDataSet.ColumnTypeList
-}
-
-func (s *SessionDataSet) HasNext() bool {
- return s.ioTDBRpcDataSet.next()
-}
-
-func (s *SessionDataSet) Next() (*RowRecord, error) {
- if !s.ioTDBRpcDataSet.HasCachedRecord && !s.HasNext() {
- return nil, nil
- }
- s.ioTDBRpcDataSet.HasCachedRecord = false
- return s.constructRowRecordFromValueArray()
-}
-
-func (s *SessionDataSet) constructRowRecordFromValueArray() (*RowRecord, error) {
- var outFields []Field
- var err error
- for i := 0; i < s.ioTDBRpcDataSet.getColumnSize(); i++ {
- var field Field
- var index = i + 1
- var datasetColumnIndex = i + StarIndex
- if s.ioTDBRpcDataSet.IgnoreTimeStamp {
- index--
- datasetColumnIndex--
- }
- var loc = s.ioTDBRpcDataSet.ColumnOrdinalMap[s.ioTDBRpcDataSet.ColumnNameList[index]] - StarIndex
- dataSetIsNil, err := s.ioTDBRpcDataSet.isNil(datasetColumnIndex)
- if err != nil {
- return nil, err
- }
- if !dataSetIsNil {
- valueBytes := s.ioTDBRpcDataSet.Values[loc]
- dataType := s.ioTDBRpcDataSet.ColumnTypeDeduplicatedList[loc]
- bytesBuffer := bytes.NewBuffer(valueBytes)
- field = NewField(dataType)
- switch dataType {
- case "BOOLEAN":
- var booleanValue bool
- binary.Read(bytesBuffer, binary.BigEndian, &booleanValue)
- field.SetBoolV(booleanValue)
- break
- case "INT32":
- var intValue int32
- binary.Read(bytesBuffer, binary.BigEndian, &intValue)
- field.SetIntV(intValue)
- break
- case "INT64":
- var longValue int64
- binary.Read(bytesBuffer, binary.BigEndian, &longValue)
- field.SetLongV(longValue)
- break
- case "FLOAT":
- var floatValue float32
- binary.Read(bytesBuffer, binary.BigEndian, &floatValue)
- field.SetFloatV(floatValue)
- break
- case "DOUBLE":
- var doubleValue float64
- binary.Read(bytesBuffer, binary.BigEndian, &doubleValue)
- field.SetDoubleV(doubleValue)
- break
- case "TEXT":
- field.SetBinaryV(valueBytes)
- break
- default:
- return nil, errors.New("unsupported data type " + dataType)
- }
- } else {
- field = NewField("")
- }
- outFields = append(outFields, field)
- }
- bytesBuffer := bytes.NewBuffer(s.ioTDBRpcDataSet.time)
- var timeStamp int64
- binary.Read(bytesBuffer, binary.BigEndian, &timeStamp)
- return &RowRecord{
- Timestamp: timeStamp,
- Fields: outFields,
- }, err
-}
-
-func (s *SessionDataSet) CloseOperationHandle() {
- s.ioTDBRpcDataSet.close()
-}
diff --git a/client/sessiondataset.go b/client/sessiondataset.go
new file mode 100644
index 0000000..69b11a2
--- /dev/null
+++ b/client/sessiondataset.go
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import "github.com/apache/iotdb-client-go/rpc"
+
+const (
+ TimestampColumnName = "Time"
+)
+
+type SessionDataSet struct {
+ ioTDBRpcDataSet *IoTDBRpcDataSet
+}
+
+func (s *SessionDataSet) Next() (bool, error) {
+ return s.ioTDBRpcDataSet.next()
+}
+
+func (s *SessionDataSet) GetText(columnName string) string {
+ return s.ioTDBRpcDataSet.getText(columnName)
+}
+
+func (s *SessionDataSet) GetBool(columnName string) bool {
+ return s.ioTDBRpcDataSet.getBool(columnName)
+}
+
+func (s *SessionDataSet) Scan(dest ...interface{}) error {
+ return s.ioTDBRpcDataSet.scan(dest...)
+}
+
+func (s *SessionDataSet) GetFloat(columnName string) float32 {
+ return s.ioTDBRpcDataSet.getFloat(columnName)
+}
+
+func (s *SessionDataSet) GetDouble(columnName string) float64 {
+ return s.ioTDBRpcDataSet.getDouble(columnName)
+}
+
+func (s *SessionDataSet) GetInt32(columnName string) int32 {
+ return s.ioTDBRpcDataSet.getInt32(columnName)
+}
+
+func (s *SessionDataSet) GetInt64(columnName string) int64 {
+ return s.ioTDBRpcDataSet.getInt64(columnName)
+}
+
+func (s *SessionDataSet) GetTimestamp() int64 {
+ return s.ioTDBRpcDataSet.GetTimestamp()
+}
+
+func (s *SessionDataSet) GetValue(columnName string) interface{} {
+ return s.ioTDBRpcDataSet.getValue(columnName)
+}
+
+func (s *SessionDataSet) GetRowRecord() (*RowRecord, error) {
+ return s.ioTDBRpcDataSet.getRowRecord()
+}
+
+func (s *SessionDataSet) GetColumnCount() int {
+ return s.ioTDBRpcDataSet.columnCount
+}
+
+func (s *SessionDataSet) GetColumnDataType(columnIndex int) TSDataType {
+ return s.ioTDBRpcDataSet.columnTypeList[columnIndex]
+}
+
+func (s *SessionDataSet) GetColumnName(columnIndex int) string {
+ return s.ioTDBRpcDataSet.columnNameList[columnIndex]
+}
+
+func (s *SessionDataSet) GetColumnNames() []string {
+ return s.ioTDBRpcDataSet.columnNameList
+}
+
+func (s *SessionDataSet) IsIgnoreTimeStamp() bool {
+ return s.ioTDBRpcDataSet.ignoreTimeStamp
+}
+
+func (s *SessionDataSet) IsClosed() bool {
+ return s.ioTDBRpcDataSet.IsClosed()
+}
+
+func (s *SessionDataSet) Close() error {
+ return s.ioTDBRpcDataSet.Close()
+}
+
+func NewSessionDataSet(sql string, columnNameList []string, columnTypeList []string,
+ columnNameIndex map[string]int32,
+ queryId int64, client *rpc.TSIServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet,
+ ignoreTimeStamp bool, fetchSize int32) *SessionDataSet {
+
+ return &SessionDataSet{
+ ioTDBRpcDataSet: NewIoTDBRpcDataSet(sql, columnNameList, columnTypeList,
+ columnNameIndex,
+ queryId, client, sessionId, queryDataSet,
+ ignoreTimeStamp, fetchSize),
+ }
+}
diff --git a/client/tablet.go b/client/tablet.go
index 5cc9506..a8e078a 100644
--- a/client/tablet.go
+++ b/client/tablet.go
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -23,139 +23,173 @@ import (
"bytes"
"encoding/binary"
"errors"
- "sort"
- "strconv"
+ "fmt"
+ "reflect"
)
-type Tablet struct {
- DeviceId string
- Measurements []string
- Values []interface{}
- Timestamps []int64
- Types []int32
+type MeasurementSchema struct {
+ Measurement string
+ DataType TSDataType
+ Encoding TSEncoding
+ Compressor TSCompressionType
+ Properties map[string]string
}
-func (t *Tablet) GetRowNumber() int32 {
- return (int32)(len(t.Timestamps))
+type Tablet struct {
+ deviceId string
+ Schemas []*MeasurementSchema
+ timestamps []int64
+ values []interface{}
+ RowSize int
}
-func (t *Tablet) GetDeviceId() string {
- return t.DeviceId
+func (t *Tablet) SetTimestamp(timestamp int64, rowIndex int) {
+ t.timestamps[rowIndex] = timestamp
}
-func (t *Tablet) GetMeasurements() []string {
- return t.Measurements
-}
+func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error {
+ if value == nil {
+ return errors.New("Illegal argument value can't be nil")
+ }
-func (t *Tablet) GetBinaryTimestamps() []byte {
- buf := bytes.NewBuffer([]byte{})
- for i := 0; i < len(t.Timestamps); i++ {
- binary.Write(buf, binary.BigEndian, t.Timestamps[i])
+ if columnIndex < 0 || columnIndex > len(t.Schemas) {
+ return fmt.Errorf("Illegal argument columnIndex %d", columnIndex)
}
- return buf.Bytes()
-}
-func (t *Tablet) GetBinaryValues() []byte {
- buf := bytes.NewBuffer([]byte{})
- for i := 0; i < len(t.Types); i++ {
- switch t.Types[i] {
- case 0:
- binary.Write(buf, binary.BigEndian, t.Values[i].([]bool))
- case 1:
- tmp := t.Values[i].([]int32)
- binary.Write(buf, binary.BigEndian, &tmp)
- case 2:
- tmp := t.Values[i].([]int64)
- binary.Write(buf, binary.BigEndian, &tmp)
- case 3:
- tmp := t.Values[i].([]float32)
- binary.Write(buf, binary.BigEndian, &tmp)
- case 4:
- tmp := t.Values[i].([]float64)
- binary.Write(buf, binary.BigEndian, &tmp)
- case 5:
- values := t.Values[i].([]string)
- for index := range values {
- tmp := (int32)(len(values[index]))
- binary.Write(buf, binary.BigEndian, &tmp)
- buf.WriteString(values[index])
- }
+ if rowIndex < 0 || rowIndex > int(t.RowSize) {
+ return fmt.Errorf("Illegal argument rowIndex %d", rowIndex)
+ }
+ switch t.Schemas[columnIndex].DataType {
+ case BOOLEAN:
+ values := t.values[columnIndex].([]bool)
+ switch value.(type) {
+ case bool:
+ values[rowIndex] = value.(bool)
+ case *bool:
+ values[rowIndex] = *value.(*bool)
+ default:
+ return fmt.Errorf("Illegal argument value %v %v", value, reflect.TypeOf(value))
+ }
+ case INT32:
+ values := t.values[columnIndex].([]int32)
+ switch value.(type) {
+ case int32:
+ values[rowIndex] = value.(int32)
+ case *int32:
+ values[rowIndex] = *value.(*int32)
+ default:
+ return fmt.Errorf("Illegal argument value %v %v", value, reflect.TypeOf(value))
+ }
+ case INT64:
+ values := t.values[columnIndex].([]int64)
+ switch value.(type) {
+ case int64:
+ values[rowIndex] = value.(int64)
+ case *int64:
+ values[rowIndex] = *value.(*int64)
+ default:
+ return fmt.Errorf("Illegal argument value %v %v", value, reflect.TypeOf(value))
+ }
+ case FLOAT:
+ values := t.values[columnIndex].([]float32)
+ switch value.(type) {
+ case float32:
+ values[rowIndex] = value.(float32)
+ case *float32:
+ values[rowIndex] = *value.(*float32)
+ default:
+ return fmt.Errorf("Illegal argument value %v %v", value, reflect.TypeOf(value))
+ }
+ case DOUBLE:
+ values := t.values[columnIndex].([]float64)
+ switch value.(type) {
+ case float64:
+ values[rowIndex] = value.(float64)
+ case *float64:
+ values[rowIndex] = *value.(*float64)
+ default:
+ return fmt.Errorf("Illegal argument value %v %v", value, reflect.TypeOf(value))
}
}
- return buf.Bytes()
+ return nil
}
-func (t *Tablet) GetTypes() []int32 {
- return t.Types
+func (t *Tablet) GetTimestampBytes() []byte {
+ buff := &bytes.Buffer{}
+ for _, v := range t.timestamps {
+ binary.Write(buff, binary.BigEndian, v)
+ }
+ return buff.Bytes()
}
-func (t *Tablet) SortTablet() error {
- var timeIndexs = make(map[int64]int, t.GetRowNumber())
- for index := range t.Timestamps {
- timeIndexs[t.Timestamps[index]] = index
+func (t *Tablet) GetMeasurements() []string {
+ measurements := make([]string, len(t.Schemas))
+ for i, s := range t.Schemas {
+ measurements[i] = s.Measurement
}
- var keys []int64
- for timeValue := range timeIndexs {
- keys = append(keys, timeValue)
+ return measurements
+}
+
+func (t *Tablet) getDataTypes() []int32 {
+ types := make([]int32, len(t.Schemas))
+ for i, s := range t.Schemas {
+ types[i] = int32(s.DataType)
}
- sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
- t.Timestamps = keys
- for index := range t.Values {
- sortValue := sortList(t.Values[index], t.Types[index], timeIndexs, t.Timestamps)
- if sortValue != nil {
- t.Values[index] = sortValue
- } else {
- return errors.New("unsupported data type " + strconv.Itoa(int(t.Types[index])))
+ return types
+}
+
+func (t *Tablet) GetValuesBytes() ([]byte, error) {
+ buff := &bytes.Buffer{}
+ for i, schema := range t.Schemas {
+ switch schema.DataType {
+ case BOOLEAN:
+ binary.Write(buff, binary.BigEndian, t.values[i].([]bool))
+ case INT32:
+ binary.Write(buff, binary.BigEndian, t.values[i].([]int32))
+ case INT64:
+ binary.Write(buff, binary.BigEndian, t.values[i].([]int64))
+ case FLOAT:
+ binary.Write(buff, binary.BigEndian, t.values[i].([]float32))
+ case DOUBLE:
+ binary.Write(buff, binary.BigEndian, t.values[i].([]float64))
+ case TEXT:
+ for _, s := range t.values[i].([]string) {
+ binary.Write(buff, binary.BigEndian, int32(len(s)))
+ binary.Write(buff, binary.BigEndian, []byte(s))
+ }
+ default:
+ return nil, fmt.Errorf("Illegal datatype %v", schema.DataType)
}
}
- return nil
+ return buff.Bytes(), nil
}
-func sortList(valueList interface{}, dataType int32, timeIndexs map[int64]int, timeStamps []int64) interface{} {
- switch dataType {
- case 0:
- boolValues := valueList.([]bool)
- sortedValues := make([]bool, len(boolValues))
- for index := range sortedValues {
- sortedValues[index] = boolValues[timeIndexs[timeStamps[index]]]
- }
- return sortedValues
- case 1:
- intValues := valueList.([]int32)
- sortedValues := make([]int32, len(intValues))
- for index := range sortedValues {
- sortedValues[index] = intValues[timeIndexs[timeStamps[index]]]
- }
- return sortedValues
- case 2:
- longValues := valueList.([]int64)
- sortedValues := make([]int64, len(longValues))
- for index := range sortedValues {
- sortedValues[index] = longValues[timeIndexs[timeStamps[index]]]
- }
- return sortedValues
- case 3:
- floatValues := valueList.([]float32)
- sortedValues := make([]float32, len(floatValues))
- for index := range sortedValues {
- sortedValues[index] = floatValues[timeIndexs[timeStamps[index]]]
- }
- return sortedValues
- case 4:
- doubleValues := valueList.([]float64)
- sortedValues := make([]float64, len(doubleValues))
- for index := range sortedValues {
- sortedValues[index] = doubleValues[timeIndexs[timeStamps[index]]]
- }
- return sortedValues
- case 5:
- stringValues := valueList.([]string)
- sortedValues := make([]string, len(stringValues))
- for index := range sortedValues {
- sortedValues[index] = stringValues[timeIndexs[timeStamps[index]]]
+func NewTablet(deviceId string, schemas []*MeasurementSchema, size int) (*Tablet, error) {
+ tablet := &Tablet{
+ deviceId: deviceId,
+ Schemas: schemas,
+ RowSize: size,
+ }
+ tablet.timestamps = make([]int64, size)
+ tablet.values = make([]interface{}, len(schemas))
+ for i, schema := range tablet.Schemas {
+ switch schema.DataType {
+ case BOOLEAN:
+ tablet.values[i] = make([]bool, size)
+ case INT32:
+ tablet.values[i] = make([]int32, size)
+ case INT64:
+ tablet.values[i] = make([]int64, size)
+ case FLOAT:
+ tablet.values[i] = make([]float32, size)
+ case DOUBLE:
+ tablet.values[i] = make([]float64, size)
+ case TEXT:
+ tablet.values[i] = make([]string, size)
+ default:
+ return nil, fmt.Errorf("Illegal datatype %v", schema.DataType)
}
- return sortedValues
}
- return nil
+ return tablet, nil
}
diff --git a/client/tablet_test.go b/client/tablet_test.go
new file mode 100644
index 0000000..f47c365
--- /dev/null
+++ b/client/tablet_test.go
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "reflect"
+ "testing"
+)
+
+func createTablet(size int) (*Tablet, error) {
+ tablet, err := NewTablet("root.ln.TestDevice", []*MeasurementSchema{
+ &MeasurementSchema{
+ Measurement: "restart_count",
+ DataType: INT32,
+ Encoding: RLE,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ }, &MeasurementSchema{
+ Measurement: "price",
+ DataType: DOUBLE,
+ Encoding: GORILLA,
+ Compressor: SNAPPY,
+ }, &MeasurementSchema{
+ Measurement: "tick_count",
+ DataType: INT64,
+ Encoding: RLE,
+ Compressor: SNAPPY,
+ }, &MeasurementSchema{
+ Measurement: "temperature",
+ DataType: FLOAT,
+ Encoding: GORILLA,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ }, &MeasurementSchema{
+ Measurement: "description",
+ DataType: TEXT,
+ Encoding: PLAIN,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ },
+ &MeasurementSchema{
+ Measurement: "status",
+ DataType: BOOLEAN,
+ Encoding: RLE,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ },
+ }, size)
+ return tablet, err
+}
+
+func TestTablet_getDataTypes(t *testing.T) {
+ type fields struct {
+ deviceId string
+ Schemas []*MeasurementSchema
+ timestamps []int64
+ values []interface{}
+ RowSize int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want []int32
+ }{
+ {
+ name: "",
+ fields: fields{
+ deviceId: "root.ln.device5",
+ Schemas: []*MeasurementSchema{
+ &MeasurementSchema{
+ Measurement: "restart_count",
+ DataType: INT32,
+ Encoding: RLE,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ }, &MeasurementSchema{
+ Measurement: "price",
+ DataType: DOUBLE,
+ Encoding: GORILLA,
+ Compressor: SNAPPY,
+ }, &MeasurementSchema{
+ Measurement: "tick_count",
+ DataType: INT64,
+ Encoding: RLE,
+ Compressor: SNAPPY,
+ }, &MeasurementSchema{
+ Measurement: "temperature",
+ DataType: FLOAT,
+ Encoding: GORILLA,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ }, &MeasurementSchema{
+ Measurement: "description",
+ DataType: TEXT,
+ Encoding: PLAIN,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ },
+ &MeasurementSchema{
+ Measurement: "status",
+ DataType: BOOLEAN,
+ Encoding: RLE,
+ Compressor: SNAPPY,
+ Properties: map[string]string{
+ "owner": "Mark Liu",
+ },
+ },
+ },
+ timestamps: []int64{},
+ values: []interface{}{},
+ RowSize: 0,
+ },
+ want: []int32{int32(INT32), int32(DOUBLE), int32(INT64), int32(FLOAT), int32(TEXT), int32(BOOLEAN)},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tablet := &Tablet{
+ deviceId: tt.fields.deviceId,
+ Schemas: tt.fields.Schemas,
+ timestamps: tt.fields.timestamps,
+ values: tt.fields.values,
+ RowSize: tt.fields.RowSize,
+ }
+ if got := tablet.getDataTypes(); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Tablet.getDataTypes() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestTablet_SetTimestamp(t *testing.T) {
+ type args struct {
+ timestamp int64
+ rowIndex int
+ }
+ tests := []struct {
+ name string
+ args args
+ want int64
+ }{
+ {
+ name: "",
+ args: args{
+ timestamp: 1608268702769,
+ rowIndex: 0,
+ },
+ want: 1608268702769,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tablet, err := createTablet(1)
+ if err != nil {
+ t.Error(err)
+ }
+ tablet.SetTimestamp(tt.args.timestamp, tt.args.rowIndex)
+ if got := tablet.timestamps[0]; got != tt.want {
+ t.Errorf("Tablet.SetTimestamp(int64, int32) = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestTablet_SetValueAt(t *testing.T) {
+ type args struct {
+ value interface{}
+ columnIndex int
+ rowIndex int
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "nil",
+ args: args{
+ value: nil,
+ columnIndex: 0,
+ rowIndex: 0,
+ },
+ wantErr: true,
+ }, {
+ name: "columnIndex-1",
+ args: args{
+ value: 0,
+ columnIndex: -1,
+ rowIndex: 0,
+ },
+ wantErr: true,
+ }, {
+ name: "rowIndex-1",
+ args: args{
+ value: 0,
+ columnIndex: 0,
+ rowIndex: -1,
+ },
+ wantErr: true,
+ }, {
+ name: "rowIndex-65535",
+ args: args{
+ value: 0,
+ columnIndex: 0,
+ rowIndex: 65535,
+ },
+ wantErr: true,
+ }, {
+ name: "columnIndex-65535",
+ args: args{
+ value: 0,
+ columnIndex: 65535,
+ rowIndex: 0,
+ },
+ wantErr: true,
+ }, {
+ name: "restart_count",
+ args: args{
+ value: int32(0),
+ columnIndex: 0,
+ rowIndex: 0,
+ },
+ wantErr: false,
+ }, {
+ name: "price",
+ args: args{
+ value: float64(32.768),
+ columnIndex: 1,
+ rowIndex: 0,
+ },
+ wantErr: false,
+ }, {
+ name: "tick_count",
+ args: args{
+ value: int64(1608268702780),
+ columnIndex: 2,
+ rowIndex: 0,
+ },
+ wantErr: false,
+ }, {
+ name: "temperature",
+ args: args{
+ value: float32(36.5),
+ columnIndex: 3,
+ rowIndex: 0,
+ },
+ wantErr: false,
+ }, {
+ name: "description",
+ args: args{
+ value: "Hello world!",
+ columnIndex: 4,
+ rowIndex: 0,
+ },
+ wantErr: false,
+ }, {
+ name: "status",
+ args: args{
+ value: true,
+ columnIndex: 5,
+ rowIndex: 0,
+ },
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tablet, err := createTablet(1)
+ if err != nil {
+ t.Error(err)
+ }
+ if err := tablet.SetValueAt(tt.args.value, tt.args.columnIndex, tt.args.rowIndex); (err != nil) != tt.wantErr {
+ t.Errorf("Tablet.SetValueAt() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
diff --git a/client/utils.go b/client/utils.go
new file mode 100644
index 0000000..24367cf
--- /dev/null
+++ b/client/utils.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "strconv"
+
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+func int32ToString(n int32) string {
+ return strconv.Itoa(int(n))
+}
+
+func int64ToString(n int64) string {
+ return strconv.FormatInt(n, 10)
+}
+
+func float32ToString(val float32) string {
+ return strconv.FormatFloat(float64(val), 'f', -1, 32)
+}
+
+func float64ToString(val float64) string {
+ return strconv.FormatFloat(val, 'f', -1, 64)
+}
+
+func int32ToBytes(n int32) []byte {
+ bytesBuffer := bytes.NewBuffer([]byte{})
+ binary.Write(bytesBuffer, binary.BigEndian, n)
+ return bytesBuffer.Bytes()
+}
+
+func int64ToBytes(n int64) []byte {
+ bytesBuffer := bytes.NewBuffer([]byte{})
+ binary.Write(bytesBuffer, binary.BigEndian, n)
+ return bytesBuffer.Bytes()
+}
+
+func bytesToInt32(bys []byte) int32 {
+ bytebuff := bytes.NewBuffer(bys)
+ var data int32
+ binary.Read(bytebuff, binary.BigEndian, &data)
+ return int32(data)
+}
+
+func bytesToInt64(bys []byte) int64 {
+ bytebuff := bytes.NewBuffer(bys)
+ var data int64
+ binary.Read(bytebuff, binary.BigEndian, &data)
+ return int64(data)
+}
+
+func verifySuccesses(statuses []*rpc.TSStatus) error {
+ buff := bytes.Buffer{}
+ for _, status := range statuses {
+ if status.Code != SuccessStatus && status.Code != NeedRedirection {
+ buff.WriteString(*status.Message + ";")
+ }
+ }
+ errMsgs := buff.String()
+ if len(errMsgs) > 0 {
+ return NewBatchError(statuses)
+ }
+ return nil
+}
+
+func VerifySuccess(status *rpc.TSStatus) error {
+ if status.Code == NeedRedirection {
+ return nil
+ }
+
+ if status.Code == MultipleError {
+ if err := verifySuccesses(status.GetSubStatus()); err != nil {
+ return err
+ }
+ return nil
+ }
+ if status.Code != SuccessStatus {
+ if status.Message != nil {
+ return fmt.Errorf("Error Code: %d, Message: %v", status.Code, *status.Message)
+ } else {
+ return fmt.Errorf("Error Code: %d", status.Code)
+ }
+ }
+ return nil
+}
diff --git a/client/utils_test.go b/client/utils_test.go
new file mode 100644
index 0000000..6b08e52
--- /dev/null
+++ b/client/utils_test.go
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "testing"
+
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+func Test_bytesToInt32(t *testing.T) {
+ type args struct {
+ bys []byte
+ }
+ tests := []struct {
+ name string
+ args args
+ want int32
+ }{
+ {
+ name: "",
+ args: args{
+ bys: int32ToBytes(65535),
+ },
+ want: 65535,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := bytesToInt32(tt.args.bys); got != tt.want {
+ t.Errorf("bytesToInt32() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_bytesToInt64(t *testing.T) {
+ type args struct {
+ bys []byte
+ }
+ tests := []struct {
+ name string
+ args args
+ want int64
+ }{
+ {
+ name: "",
+ args: args{
+ bys: int64ToBytes(1607237683018),
+ },
+ want: 1607237683018,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := bytesToInt64(tt.args.bys); got != tt.want {
+ t.Errorf("bytesToInt64() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_int32ToString(t *testing.T) {
+ type args struct {
+ n int32
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "Test Int32",
+ args: args{
+ n: 65535,
+ },
+ want: "65535",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := int32ToString(tt.args.n); got != tt.want {
+ t.Errorf("int32ToString() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_int64ToString(t *testing.T) {
+ type args struct {
+ n int64
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "Test Int64",
+ args: args{
+ n: 7684873721715404507,
+ },
+ want: "7684873721715404507",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := int64ToString(tt.args.n); got != tt.want {
+ t.Errorf("int64ToString() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_float32ToString(t *testing.T) {
+ type args struct {
+ val float32
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "Test Float32",
+ args: args{
+ val: 0.97800666,
+ },
+ want: "0.97800666",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := float32ToString(tt.args.val); got != tt.want {
+ t.Errorf("float32ToString() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_float64ToString(t *testing.T) {
+ type args struct {
+ val float64
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "Test Flota64",
+ args: args{
+ val: 0.39751212862981283,
+ },
+ want: "0.39751212862981283",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := float64ToString(tt.args.val); got != tt.want {
+ t.Errorf("float64ToString() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_verifySuccess(t *testing.T) {
+ type args struct {
+ status *rpc.TSStatus
+ }
+ var errMsg string = "error occurred"
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "NeedRedirection",
+ args: args{
+ status: &rpc.TSStatus{
+ Code: NeedRedirection,
+ Message: &errMsg,
+ SubStatus: []*rpc.TSStatus{},
+ },
+ },
+ wantErr: false,
+ }, {
+ name: "SuccessStatus",
+ args: args{
+ status: &rpc.TSStatus{
+ Code: SuccessStatus,
+ Message: &errMsg,
+ SubStatus: []*rpc.TSStatus{},
+ },
+ },
+ wantErr: false,
+ }, {
+ name: "MultipleError",
+ args: args{
+ status: &rpc.TSStatus{
+ Code: MultipleError,
+ Message: &errMsg,
+ SubStatus: []*rpc.TSStatus{
+ &rpc.TSStatus{
+ Code: ShutDownError,
+ Message: &errMsg,
+ },
+ },
+ },
+ },
+ wantErr: true,
+ }, {
+ name: "CloseOperationError",
+ args: args{
+ status: &rpc.TSStatus{
+ Code: CloseOperationError,
+ Message: &errMsg,
+ SubStatus: []*rpc.TSStatus{},
+ },
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := VerifySuccess(tt.args.status); (err != nil) != tt.wantErr {
+ t.Errorf("VerifySuccess() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func Test_verifySuccesses(t *testing.T) {
+ type args struct {
+ statuses []*rpc.TSStatus
+ }
+ var internalServerError string = "InternalServerError"
+ var success string = "Success"
+ var needRedirection string = "NeedRedirection"
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "InternalServerError",
+ args: args{
+ statuses: []*rpc.TSStatus{
+ &rpc.TSStatus{
+ Code: InternalServerError,
+ Message: &internalServerError,
+ SubStatus: []*rpc.TSStatus{},
+ },
+ },
+ },
+ wantErr: true,
+ }, {
+ name: "SuccessStatus",
+ args: args{
+ statuses: []*rpc.TSStatus{
+ &rpc.TSStatus{
+ Code: SuccessStatus,
+ Message: &success,
+ SubStatus: []*rpc.TSStatus{},
+ },
+ },
+ },
+ wantErr: false,
+ },
+ {
+ name: "NeedRedirection",
+ args: args{
+ statuses: []*rpc.TSStatus{
+ &rpc.TSStatus{
+ Code: NeedRedirection,
+ Message: &needRedirection,
+ SubStatus: []*rpc.TSStatus{},
+ },
+ },
+ },
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := verifySuccesses(tt.args.statuses); (err != nil) != tt.wantErr {
+ t.Errorf("verifySuccesses() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
diff --git a/example/session_example.go b/example/session_example.go
index f1ac71d..f4daff1 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -21,76 +21,259 @@ package main
import (
"fmt"
+ "log"
+ "math/rand"
+ "time"
+
"github.com/apache/iotdb-client-go/client"
+ "github.com/apache/iotdb-client-go/rpc"
)
var session *client.Session
func main() {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
+ config := &client.Config{
+ Host: "127.0.0.1",
+ Port: "6667",
+ UserName: "root",
+ Password: "root",
+ }
session = client.NewSession(config)
- session.Open(false, 0)
- setStorageGroup()
- deleteStorageGroup()
- deleteStorageGroups()
- createTimeseries()
+ if err := session.Open(false, 0); err != nil {
+ log.Fatal(err)
+ }
+ defer session.Close()
+
+ setStorageGroup("root.ln1")
+ deleteStorageGroup("root.ln1")
+
+ setStorageGroup("root.ln1")
+ setStorageGroup("root.ln2")
+ deleteStorageGroups("root.ln1", "root.ln2")
+
+ createTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.sg1.dev1.status")
+
createMultiTimeseries()
- deleteTimeseries()
+ deleteTimeseries("root.sg1.dev1.temperature")
+
insertStringRecord()
+ deleteTimeseries("root.ln.wf02.wt02.hardware")
+
insertRecord()
+ deleteTimeseries("root.sg1.dev1.status")
+
insertRecords()
+ deleteTimeseries("root.sg1.dev1.status")
+
insertTablet()
+ if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1"); err == nil {
+ printDevice1(ds)
+ ds.Close()
+ } else {
+ log.Fatal(err)
+ }
+ deleteTimeseries("root.ln.device1.restart_count", "root.ln1.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
+
insertTablets()
+ deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
+
+ insertRecord()
deleteData()
+ deleteTimeseries("root.sg1.dev1.status")
+
setTimeZone()
- fmt.Println(getTimeZone())
+ if tz, err := getTimeZone(); err != nil {
+ fmt.Printf("TimeZone: %s", tz)
+ }
+
executeStatement()
executeQueryStatement()
executeRawDataQuery()
executeBatchStatement()
- session.Close()
+
+ deleteTimeseries("root.ln.wf02.wt02.s5")
}
-func setStorageGroup() {
- var storageGroupId = "root.ln1"
- session.SetStorageGroup(storageGroupId)
+func printDevice1(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for _, columnName := range sds.GetColumnNames() {
+ fmt.Printf("%s\t", columnName)
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+ }
+
+ var restartCount int32
+ var price float64
+ var tickCount int64
+ var temperature float32
+ var description string
+ var status bool
+
+ // All of iotdb datatypes can be scan into string variables
+ // var restartCount string
+ // var price string
+ // var tickCount string
+ // var temperature string
+ // var description string
+ // var status string
+
+ if err := sds.Scan(&restartCount, &price, &tickCount, &temperature, &description, &status); err != nil {
+ log.Fatal(err)
+ }
+
+ whitespace := "\t\t"
+ fmt.Printf("%v%s", restartCount, whitespace)
+ fmt.Printf("%v%s", price, whitespace)
+ fmt.Printf("%v%s", tickCount, whitespace)
+ fmt.Printf("%v%s", temperature, whitespace)
+ fmt.Printf("%v%s", description, whitespace)
+ fmt.Printf("%v%s", status, whitespace)
+
+ fmt.Println()
+ }
}
-func deleteStorageGroup() {
- var storageGroupId = "root.ln1"
- session.DeleteStorageGroup(storageGroupId)
+func printDataSet0(sessionDataSet *client.SessionDataSet) {
+ showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName))
+ }
+ for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+ columnName := sessionDataSet.GetColumnName(i)
+ switch sessionDataSet.GetColumnDataType(i) {
+ case client.BOOLEAN:
+ fmt.Print(sessionDataSet.GetBool(columnName))
+ break
+ case client.INT32:
+ fmt.Print(sessionDataSet.GetInt32(columnName))
+ break
+ case client.INT64:
+ fmt.Print(sessionDataSet.GetInt64(columnName))
+ break
+ case client.FLOAT:
+ fmt.Print(sessionDataSet.GetFloat(columnName))
+ break
+ case client.DOUBLE:
+ fmt.Print(sessionDataSet.GetDouble(columnName))
+ break
+ case client.TEXT:
+ fmt.Print(sessionDataSet.GetText(columnName))
+ default:
+ }
+ fmt.Print("\t\t")
+ }
+ fmt.Println()
+ }
}
-func deleteStorageGroups() {
- var storageGroupId = []string{"root.ln1"}
- session.DeleteStorageGroups(storageGroupId)
+func printDataSet1(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sds.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+ }
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ columnName := sds.GetColumnName(i)
+ v := sds.GetValue(columnName)
+ if v == nil {
+ v = "null"
+ }
+ fmt.Printf("%v\t\t", v)
+ }
+ fmt.Println()
+ }
}
-func createTimeseries() {
+func printDataSet2(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sds.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+ }
+
+ if record, err := sds.GetRowRecord(); err == nil {
+ for _, field := range record.GetFields() {
+ v := field.GetValue()
+ if field.IsNull() {
+ v = "null"
+ }
+ fmt.Printf("%v\t\t", v)
+ }
+ fmt.Println()
+ }
+ }
+}
+
+func setStorageGroup(sg string) {
+ checkError(session.SetStorageGroup(sg))
+}
+
+func deleteStorageGroup(sg string) {
+ checkError(session.DeleteStorageGroup(sg))
+}
+
+func deleteStorageGroups(sgs ...string) {
+ checkError(session.DeleteStorageGroups(sgs...))
+}
+
+func createTimeseries(path string) {
var (
- path = "root.sg1.dev1.status"
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
- session.CreateTimeseries(path, dataType, encoding, compressor)
+ checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
func createMultiTimeseries() {
var (
paths = []string{"root.sg1.dev1.temperature"}
- dataTypes = []int32{client.TEXT}
- encodings = []int32{client.PLAIN}
- compressors = []int32{client.SNAPPY}
+ dataTypes = []client.TSDataType{client.TEXT}
+ encodings = []client.TSEncoding{client.PLAIN}
+ compressors = []client.TSCompressionType{client.SNAPPY}
)
- session.CreateMultiTimeseries(paths, dataTypes, encodings, compressors)
+ checkError(session.CreateMultiTimeseries(paths, dataTypes, encodings, compressors))
}
-func deleteTimeseries() {
- var paths = []string{"root.sg1.dev1.status"}
- session.DeleteTimeseries(paths)
+func deleteTimeseries(paths ...string) {
+ checkError(session.DeleteTimeseries(paths))
}
func insertStringRecord() {
@@ -100,7 +283,7 @@ func insertStringRecord() {
values = []string{"123"}
timestamp int64 = 12
)
- session.InsertStringRecord(deviceId, measurements, values, timestamp)
+ checkError(session.InsertStringRecord(deviceId, measurements, values, timestamp))
}
func insertRecord() {
@@ -108,21 +291,21 @@ func insertRecord() {
deviceId = "root.sg1.dev1"
measurements = []string{"status"}
values = []interface{}{"123"}
- dataTypes = []int32{client.TEXT}
+ dataTypes = []client.TSDataType{client.TEXT}
timestamp int64 = 12
)
- session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp)
+ checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
}
func insertRecords() {
var (
deviceId = []string{"root.sg1.dev1"}
measurements = [][]string{{"status"}}
- dataTypes = [][]int32{{client.TEXT}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}}
values = [][]interface{}{{"123"}}
timestamp = []int64{12}
)
- session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp)
+ checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
}
func deleteData() {
@@ -131,64 +314,83 @@ func deleteData() {
startTime int64 = 0
endTime int64 = 12
)
- session.DeleteData(paths, startTime, endTime)
+ checkError(session.DeleteData(paths, startTime, endTime))
}
func insertTablet() {
- var (
- deviceId = "root.sg1.dev1"
- measurements = []string{"status", "tem"}
- dataTypes = []int32{client.INT32, client.INT32}
- values = make([]interface{}, 2)
- timestamp = []int64{154, 123}
- )
- values[0] = []int32{777, 6666}
- values[1] = []int32{888, 999}
- var tablet = client.Tablet{
- DeviceId: deviceId,
- Measurements: measurements,
- Values: values,
- Timestamps: timestamp,
- Types: dataTypes,
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertTablet(tablet)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
}
- session.InsertTablet(tablet)
+}
+
+func createTablet(rowCount int) (*client.Tablet, error) {
+ tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
+ &client.MeasurementSchema{
+ Measurement: "restart_count",
+ DataType: client.INT32,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, &client.MeasurementSchema{
+ Measurement: "price",
+ DataType: client.DOUBLE,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, &client.MeasurementSchema{
+ Measurement: "tick_count",
+ DataType: client.INT64,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, &client.MeasurementSchema{
+ Measurement: "temperature",
+ DataType: client.FLOAT,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, &client.MeasurementSchema{
+ Measurement: "description",
+ DataType: client.TEXT,
+ Encoding: client.PLAIN,
+ Compressor: client.SNAPPY,
+ },
+ &client.MeasurementSchema{
+ Measurement: "status",
+ DataType: client.BOOLEAN,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ },
+ }, rowCount)
+
+ if err != nil {
+ return nil, err
+ }
+ ts := time.Now().UTC().UnixNano() / 1000000
+ for row := 0; row < int(rowCount); row++ {
+ ts++
+ tablet.SetTimestamp(ts, row)
+ tablet.SetValueAt(rand.Int31(), 0, row)
+ tablet.SetValueAt(rand.Float64(), 1, row)
+ tablet.SetValueAt(rand.Int63(), 2, row)
+ tablet.SetValueAt(rand.Float32(), 3, row)
+ tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+ tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+ }
+ return tablet, nil
}
func insertTablets() {
- var (
- deviceId1 = "root.sg1.dev1"
- measurements1 = []string{"status", "tem"}
- dataTypes1 = []int32{client.INT32, client.INT32}
- values1 = make([]interface{}, 2)
- timestamp1 = []int64{154, 123}
- )
- values1[0] = []int32{777, 6666}
- values1[1] = []int32{888, 999}
- var tablet1 = client.Tablet{
- DeviceId: deviceId1,
- Measurements: measurements1,
- Values: values1,
- Timestamps: timestamp1,
- Types: dataTypes1,
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
}
- var (
- deviceId2 = "root.sg1.dev2"
- measurements2 = []string{"status", "tem"}
- dataTypes2 = []int32{client.INT32, client.INT32}
- values2 = make([]interface{}, 2)
- timestamp2 = []int64{154, 123}
- )
- values2[0] = []int32{777, 6666}
- values2[1] = []int32{888, 999}
- var tablet2 = client.Tablet{
- DeviceId: deviceId2,
- Measurements: measurements2,
- Values: values2,
- Timestamps: timestamp2,
- Types: dataTypes2,
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
}
- tablets := []client.Tablet{tablet1, tablet2}
- session.InsertTablets(tablets)
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ checkError(session.InsertTablets(tablets))
}
func setTimeZone() {
@@ -203,48 +405,22 @@ func getTimeZone() (string, error) {
func executeStatement() {
var sql = "show storage group"
sessionDataSet, err := session.ExecuteStatement(sql)
- if err != nil {
- fmt.Println(err)
- }
- for i := 0; i < len(sessionDataSet.GetColumnNames()); i++ {
- println(sessionDataSet.GetColumnNames()[i])
- }
- for {
- if sessionDataSet.HasNext() {
- record, err := sessionDataSet.Next()
- if err != nil {
- fmt.Println(err)
- }
- for i := 0; i < len(record.Fields); i++ {
- println(record.Fields[i].GetStringValue())
- }
- } else {
- break
- }
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
}
}
func executeQueryStatement() {
var sql = "select count(s3) from root.sg1.dev1"
sessionDataSet, err := session.ExecuteQueryStatement(sql)
- if err != nil {
- fmt.Println(err)
- }
- for i := 0; i < len(sessionDataSet.GetColumnNames()); i++ {
- println(sessionDataSet.GetColumnNames()[i])
- }
- for {
- if sessionDataSet.HasNext() {
- record, err := sessionDataSet.Next()
- if err != nil {
- fmt.Println(err)
- }
- for i := 0; i < len(record.Fields); i++ {
- println(record.Fields[i].GetLongV())
- }
- } else {
- break
- }
+ if err == nil {
+ printDataSet1(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
}
}
@@ -256,50 +432,28 @@ func executeRawDataQuery() {
endTime int64 = 200
)
sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
- if err != nil {
- fmt.Println(err)
- }
- for i := 0; i < len(sessionDataSet.GetColumnNames()); i++ {
- println(sessionDataSet.GetColumnNames()[i])
- }
- for {
- if sessionDataSet.HasNext() {
- record, err := sessionDataSet.Next()
- if err != nil {
- fmt.Println(err)
- }
- println(record.Timestamp)
- for i := 0; i < len(record.Fields); i++ {
- switch record.Fields[i].DataType {
- case "BOOLEAN":
- println(record.Fields[i].GetBoolV())
- break
- case "INT32":
- println(record.Fields[i].GetIntV())
- break
- case "INT64":
- println(record.Fields[i].GetLongV())
- break
- case "FLOAT":
- println(record.Fields[i].GetFloatV())
- break
- case "DOUBLE":
- println(record.Fields[i].GetDoubleV())
- break
- case "TEXT":
- println(string(record.Fields[i].GetBinaryV()))
- break
- }
-
- }
- } else {
- break
- }
+ if err == nil {
+ printDataSet2(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
}
}
func executeBatchStatement() {
var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
"insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
- session.ExecuteBatchStatement(sqls)
+ checkError(session.ExecuteBatchStatement(sqls))
+}
+
+func checkError(status *rpc.TSStatus, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if status != nil {
+ if err = client.VerifySuccess(status); err != nil {
+ log.Println(err)
+ }
+ }
}
diff --git a/go.mod b/go.mod
index 2d0b8b4..ea91d7b 100644
--- a/go.mod
+++ b/go.mod
@@ -1,7 +1,5 @@
module github.com/apache/iotdb-client-go
-go 1.14
+go 1.13
-require (
- github.com/apache/thrift v0.13.0
-)
+require github.com/apache/thrift v0.13.0
diff --git a/go.sum b/go.sum
index 83f27d3..171755b 100644
--- a/go.sum
+++ b/go.sum
@@ -1 +1,2 @@
+github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
diff --git a/test/session_test.go b/test/session_test.go
deleted file mode 100644
index bbb4b93..0000000
--- a/test/session_test.go
+++ /dev/null
@@ -1,579 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package test
-
-import (
- "fmt"
- "github.com/apache/iotdb-client-go/client"
- "strings"
- "testing"
-)
-
-func TestSetStorageGroup(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- storageGroupIds := []string{"root.sg1", "root.sg2"}
- for i := 0; i < 2; i++ {
- var (
- actually = false
- expected = true
- )
- session.SetStorageGroup(storageGroupIds[i])
- sessionDataSet, _ := session.ExecuteStatement("show storage group")
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if string(record.Fields[0].GetBinaryV()) == storageGroupIds[i] {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != expected {
- t.Errorf("SetStorageGroup: [%v], actually: [%v]", storageGroupIds[i], actually)
- }
- }
- session.Close()
-}
-
-func TestDeleteStorageGroup(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- var (
- actually = true
- expected = true
- storageGroupId = "root.sg1"
- )
- session.DeleteStorageGroup(storageGroupId)
- sessionDataSet, _ := session.ExecuteStatement("show storage group")
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if string(record.Fields[0].GetBinaryV()) == "root.sg1" {
- actually = false
- break
- }
- } else {
- break
- }
- }
- if actually != expected {
- t.Errorf("DeleteStorageGroup: [%v], actually: [%v]", storageGroupId, actually)
- }
- session.Close()
-}
-
-func TestDeleteStorageGroups(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- var (
- actually = true
- expected = true
- storageGroupIds = []string{"root.sg2"}
- )
- session.DeleteStorageGroups(storageGroupIds)
- sessionDataSet, _ := session.ExecuteStatement("show storage group")
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if string(record.Fields[0].GetBinaryV()) == "root.sg1" {
- actually = false
- break
- }
- } else {
- break
- }
- }
- sg := strings.Replace(strings.Trim(fmt.Sprint(storageGroupIds), "[]"), " ", ",", -1)
- if actually != expected {
- t.Errorf("DeleteStorageGroups: [%v], actually: [%v]", sg, actually)
- }
- session.Close()
-}
-
-func TestCreateTimeseries(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- path string
- dataType int32
- encoding int32
- compressor int32
- expected bool
- }{
- {"root.sg1.dev1.status", client.FLOAT, client.PLAIN, client.SNAPPY, true},
- } {
- session.CreateTimeseries(unit.path, unit.dataType, unit.encoding, unit.compressor)
- if actually := session.CheckTimeseriesExists(unit.path); actually != unit.expected {
- t.Errorf("CreateTimeseries: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestCreateMultiTimeseries(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- paths []string
- dataTypes []int32
- encodings []int32
- compressors []int32
- expected bool
- }{
- {[]string{"root.sg1.dev1.temperature"}, []int32{client.FLOAT}, []int32{client.PLAIN},
- []int32{client.SNAPPY}, true},
- } {
- session.CreateMultiTimeseries(unit.paths, unit.dataTypes, unit.encodings, unit.compressors)
- for _, path := range unit.paths {
- if actually := session.CheckTimeseriesExists(path); actually != unit.expected {
- t.Errorf("CreateTimeseries: [%v], actually: [%v]", unit, actually)
- }
- }
- }
- session.Close()
-}
-
-func TestDeleteTimeseries(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- paths []string
- expected bool
- }{
- {[]string{"root.sg1.dev1.status"}, false},
- } {
- session.DeleteTimeseries(unit.paths)
- for _, path := range unit.paths {
- if actually := session.CheckTimeseriesExists(path); actually != unit.expected {
- t.Errorf("DeleteTimeseries: [%v], actually: [%v]", unit, actually)
- }
- }
- }
- session.Close()
-}
-
-func TestInsertStringRecord(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- deviceId string
- measurements []string
- values []string
- timestamp int64
- expected bool
- }{
- {"root.sg1.dev1", []string{"s"}, []string{"tem"}, 111, true},
- } {
- session.InsertStringRecord(unit.deviceId, unit.measurements, unit.values, unit.timestamp)
- sessionDataSet, _ := session.ExecuteStatement("select count(s) from root.sg1.dev1")
- var actually = false
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if record.Fields[0].GetLongV() == 1 {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("InsertStringRecord: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestDeleteData(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- paths []string
- startTime int64
- endTime int64
- expected bool
- }{
- {[]string{"root.sg1.dev1.temperature"}, 0, 111, true},
- } {
- session.DeleteData(unit.paths, unit.startTime, unit.endTime)
- sessionDataSet, _ := session.ExecuteStatement("select count(temperature) from root.sg1.dev1")
- var actually = false
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if record.Fields[0].GetLongV() == 0 {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("DeleteData: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestGetTimeZone(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- if actually, _ := session.GetTimeZone(); actually != client.DefaultZoneId {
- t.Errorf("GetTimeZone, actually: [%v]", actually)
- }
- session.Close()
-}
-
-func TestSetTimeZone(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- timeZone string
- expected string
- }{
- {"GMT", "GMT"},
- } {
- session.SetTimeZone(unit.timeZone)
- if actually, _ := session.GetTimeZone(); actually != unit.expected {
- t.Errorf("SetTimeZone: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestInsertRecord(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- var val int32 = 2
- for _, unit := range []struct {
- deviceId string
- measurements []string
- dataTypes []int32
- values []interface{}
- timestamp int64
- expected bool
- }{
- {"root.sg1.dev1", []string{"wind"}, []int32{client.INT32},
- []interface{}{val}, 111, true},
- } {
- session.InsertRecord(unit.deviceId, unit.measurements, unit.dataTypes, unit.values, unit.timestamp)
- sessionDataSet, _ := session.ExecuteStatement("select count(wind) from root.sg1.dev1")
- var actually = false
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if record.Fields[0].GetLongV() == 1 {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("InsertRecord: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestInsertRecords(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- var val int32 = 2
- for _, unit := range []struct {
- deviceIds []string
- measurements [][]string
- dataTypes [][]int32
- values [][]interface{}
- timestamps []int64
- expected bool
- }{
- {[]string{"root.sg1.dev1"}, [][]string{{"height"}}, [][]int32{{client.INT32}},
- [][]interface{}{{val}}, []int64{222}, true},
- } {
- session.InsertRecords(unit.deviceIds, unit.measurements, unit.dataTypes, unit.values, unit.timestamps)
- sessionDataSet, _ := session.ExecuteStatement("select count(height) from root.sg1.dev1")
- var actually = false
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if record.Fields[0].GetLongV() == 1 {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("InsertRecords: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestInsertTablet(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- var (
- deviceId = "root.sg1.dev1"
- measurements = []string{"s1", "s2"}
- dataTypes = []int32{client.INT32, client.INT32}
- values = make([]interface{}, 2)
- timestamp = []int64{154, 123}
- )
- values[0] = []int32{777, 6666}
- values[1] = []int32{888, 999}
- var tablet = client.Tablet{
- DeviceId: deviceId,
- Measurements: measurements,
- Values: values,
- Timestamps: timestamp,
- Types: dataTypes,
- }
- session.Open(false, 0)
- for _, unit := range []struct {
- tablet client.Tablet
- expected bool
- }{
- {tablet, true},
- } {
- session.InsertTablet(unit.tablet)
- sessionDataSet, _ := session.ExecuteStatement("select count(s2) from root.sg1.dev1")
- var actually = false
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if record.Fields[0].GetLongV() == 2 {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("InsertTablet: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestInsertTablets(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- var (
- deviceId1 = "root.sg1.dev1"
- measurements1 = []string{"s3", "s4"}
- dataTypes1 = []int32{client.TEXT, client.INT32}
- values1 = make([]interface{}, 2)
- timestamp1 = []int64{154, 123}
- )
- values1[0] = []string{"aaa", "bbb"}
- values1[1] = []int32{888, 999}
- var tablet1 = client.Tablet{
- DeviceId: deviceId1,
- Measurements: measurements1,
- Values: values1,
- Timestamps: timestamp1,
- Types: dataTypes1,
- }
- var (
- deviceId2 = "root.sg1.dev2"
- measurements2 = []string{"status", "tem"}
- dataTypes2 = []int32{client.INT32, client.INT32}
- values2 = make([]interface{}, 2)
- timestamp2 = []int64{154, 123}
- )
- values2[0] = []int32{777, 6666}
- values2[1] = []int32{888, 999}
- var tablet2 = client.Tablet{
- DeviceId: deviceId2,
- Measurements: measurements2,
- Values: values2,
- Timestamps: timestamp2,
- Types: dataTypes2,
- }
- tablets := []client.Tablet{tablet1, tablet2}
- session.Open(false, 0)
- for _, unit := range []struct {
- tablets []client.Tablet
- expected bool
- }{
- {tablets, true},
- } {
- session.InsertTablets(unit.tablets)
- sessionDataSet, _ := session.ExecuteQueryStatement("select count(s3) from root.sg1.dev1")
- var actually = false
- for {
- if sessionDataSet.HasNext() {
- record, _ := sessionDataSet.Next()
- if record.Fields[0].GetLongV() == 2 {
- actually = true
- break
- }
- } else {
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("InsertTablets: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestExecuteStatement(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- for _, unit := range []struct {
- sql string
- expected bool
- }{
- {"create timeseries root.ln.wf02.wt02.s5 with datatype=BOOLEAN,encoding=PLAIN", true},
- } {
- session.ExecuteStatement(unit.sql)
- if actually := session.CheckTimeseriesExists("root.ln.wf02.wt02.s5"); actually != unit.expected {
- t.Errorf("ExecuteStatement: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestExecuteRawDataQuery(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)")
- var start int64 = 1
- var end int64 = 2
- for _, unit := range []struct {
- paths []string
- startTime int64
- endTime int64
- expected bool
- }{
- {[]string{"root.ln.wf02.wt02.s5"}, start, end, true},
- } {
- sessionDataSet, _ := session.ExecuteRawDataQuery(unit.paths, unit.startTime, unit.endTime)
- count := 0
- actually := false
- for {
- if sessionDataSet.HasNext() {
- count++
- } else {
- if count == 1 {
- actually = true
- }
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("ExecuteRawDataQuery: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}
-
-func TestExecuteBatchStatement(t *testing.T) {
- config := client.NewConfig()
- config.Host = "127.0.0.1"
- config.Port = "6667"
- session := client.NewSession(config)
- session.Open(false, 0)
- var statements = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
- "insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
- for _, unit := range []struct {
- sqls []string
- expected bool
- }{
- {statements, true},
- } {
- session.ExecuteBatchStatement(unit.sqls)
- sessionDataSet, _ := session.ExecuteQueryStatement("select s5 from root.ln.wf02.wt02")
- count := 0
- actually := false
- for {
- if sessionDataSet.HasNext() {
- count++
- } else {
- if count == 2 {
- actually = true
- }
- break
- }
- }
- if actually != unit.expected {
- t.Errorf("ExecuteBatchStatement: [%v], actually: [%v]", unit, actually)
- }
- }
- session.Close()
-}