You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/25 02:53:23 UTC
[iotdb-client-go] 01/01: Support execute non query statement
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch nonquery
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
commit c6c38b59491a98b9c2d55710e27b64a58c1b335a
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 25 10:52:03 2022 +0800
Support execute non query statement
---
client/session.go | 19 +++++
example/session_example.go | 7 ++
test/e2e/e2e_test.go | 204 +--------------------------------------------
3 files changed, 27 insertions(+), 203 deletions(-)
diff --git a/client/session.go b/client/session.go
index 01febb9..8bd08be 100644
--- a/client/session.go
+++ b/client/session.go
@@ -439,6 +439,25 @@ func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
return s.genDataSet(sql, resp), err
}
+func (s *Session) ExecuteNonQueryStatement(sql string) (r *rpc.TSStatus, err error) {
+ request := rpc.TSExecuteStatementReq{
+ SessionId: s.sessionId,
+ Statement: sql,
+ StatementId: s.requestStatementId,
+ FetchSize: &s.config.FetchSize,
+ }
+ resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+ if err != nil && resp == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ resp, err = s.client.ExecuteStatement(context.Background(), &request)
+ }
+ }
+
+ return resp.Status, err
+}
+
func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) {
request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
diff --git a/example/session_example.go b/example/session_example.go
index 5ce8cf9..6af3cb9 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -67,6 +67,9 @@ func main() {
createTimeseries("root.sg1.dev1.status")
deleteTimeseries("root.sg1.dev1.status")
+ createTimeseriesByNonQueryStatement("create timeseries root.sg1.dev1.status with datatype = int32")
+ deleteTimeseries("root.sg1.dev1.status")
+
createMultiTimeseries()
deleteTimeseries("root.sg1.dev1.temperature")
@@ -295,6 +298,10 @@ func createTimeseries(path string) {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
+func createTimeseriesByNonQueryStatement(sql string) {
+ checkError(session.ExecuteNonQueryStatement(sql))
+}
+
func createAlignedTimeseries(prefixPath string, measurements, measurementAlias []string) {
var (
dataTypes = []client.TSDataType{
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index beebf76..047df8b 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -20,9 +20,6 @@
package e2e
import (
- "fmt"
- "log"
- "math/rand"
"testing"
"time"
@@ -42,7 +39,7 @@ func TestE2ETestSuite(t *testing.T) {
func (s *e2eTestSuite) SetupSuite() {
config := &client.Config{
- Host: "iotdb",
+ Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "root",
@@ -94,40 +91,6 @@ func (s *e2eTestSuite) Test_CreateTimeseries() {
assert.Equal(timeseries, "root.tsg1.dev1.status")
}
-func (s *e2eTestSuite) Test_CreateAlignedTimeseries() {
- var (
- prefixPath = "root.tsg1.dev1"
- measurements = []string{"status", "temperature"}
- measurementAlias = []string{"sts", "temp"}
- dataTypes = []client.TSDataType{
- client.FLOAT,
- client.FLOAT,
- }
- encodings = []client.TSEncoding{
- client.PLAIN,
- client.PLAIN,
- }
- compressors = []client.TSCompressionType{
- client.LZ4,
- client.LZ4,
- }
- )
- s.checkError(s.session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
- for i := range measurements {
- fullPath := fmt.Sprintf("root.tsg1.dev1.%s", measurements[i])
- ds, err := s.session.ExecuteQueryStatement(fmt.Sprintf("show timeseries %s", fullPath), nil)
-
- assert := s.Require()
-
- assert.NoError(err)
- defer ds.Close()
- assert.True(ds.Next())
- var timeseries string
- assert.NoError(ds.Scan(×eries))
- assert.Equal(timeseries, fullPath)
- }
-}
-
func (s *e2eTestSuite) Test_InsertRecords() {
var (
deviceId = []string{"root.tsg1.dev1"}
@@ -146,170 +109,5 @@ func (s *e2eTestSuite) Test_InsertRecords() {
var status string
assert.NoError(ds.Scan(&status))
assert.Equal(status, "Working")
-}
-
-func (s *e2eTestSuite) Test_InsertAlignedRecord() {
- var (
- deviceId = "root.tsg2.dev1"
- measurements = []string{"status"}
- dataTypes = []client.TSDataType{client.TEXT}
- values = []interface{}{"Working"}
- timestamp = time.Now().UTC().UnixNano() / 1000000
- )
- s.checkError(s.session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
-
- ds, err := s.session.ExecuteQueryStatement("select status from root.tsg2.dev1", nil)
- assert := s.Require()
- assert.NoError(err)
- defer ds.Close()
- assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
- assert.Equal(status, "Working")
-}
-
-func (s *e2eTestSuite) Test_InsertAlignedRecords() {
- var (
- deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
- measurements = [][]string{{"status"}, {"temperature"}}
- dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
- values = [][]interface{}{{"33"}, {"44"}}
- timestamps = []int64{12, 13}
- )
- s.checkError(s.session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
- ds, err := s.session.ExecuteQueryStatement("select temperature from root.al1.dev3", nil)
- assert := s.Require()
- assert.NoError(err)
- defer ds.Close()
- assert.True(ds.Next())
- var temperature string
- assert.NoError(ds.Scan(&temperature))
- assert.Equal(temperature, "44")
-}
-
-func (s *e2eTestSuite) Test_InsertAlignedRecordsOfOneDevice() {
- ts := time.Now().UTC().UnixNano() / 1000000
- var (
- deviceId = "root.al1.dev4"
- measurementsSlice = [][]string{
- {"restart_count", "tick_count", "price"},
- {"temperature", "description", "status"},
- }
- dataTypes = [][]client.TSDataType{
- {client.INT32, client.INT64, client.DOUBLE},
- {client.FLOAT, client.TEXT, client.BOOLEAN},
- }
- values = [][]interface{}{
- {int32(1), int64(2018), float64(1988.1)},
- {float32(12.1), "Test Device 1", false},
- }
- timestamps = []int64{ts, ts - 1}
- )
- s.checkError(s.session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
- ds, err := s.session.ExecuteStatement("select temperature from root.al1.dev4")
- assert := s.Require()
- assert.NoError(err)
- defer ds.Close()
- assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
- assert.Equal(status, "12.1")
-}
-func (s *e2eTestSuite) Test_InsertAlignedTablet() {
- var timeseries = []string{"root.ln.device1.**"}
- s.session.DeleteTimeseries(timeseries)
- if tablet, err := createTablet(12); err == nil {
- status, err := s.session.InsertAlignedTablet(tablet, false)
- s.checkError(status, err)
- } else {
- log.Fatal(err)
- }
- var timeout int64 = 1000
- ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", &timeout)
- assert := s.Require()
- assert.NoError(err)
- defer ds.Close()
- assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
- assert.Equal(status, "12")
- s.session.DeleteStorageGroup("root.ln.**")
-}
-func createTablet(rowCount int) (*client.Tablet, error) {
- tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
- {
- Measurement: "restart_count",
- DataType: client.INT32,
- Encoding: client.RLE,
- Compressor: client.SNAPPY,
- }, {
- Measurement: "price",
- DataType: client.DOUBLE,
- Encoding: client.GORILLA,
- Compressor: client.SNAPPY,
- }, {
- Measurement: "tick_count",
- DataType: client.INT64,
- Encoding: client.RLE,
- Compressor: client.SNAPPY,
- }, {
- Measurement: "temperature",
- DataType: client.FLOAT,
- Encoding: client.GORILLA,
- Compressor: client.SNAPPY,
- }, {
- Measurement: "description",
- DataType: client.TEXT,
- Encoding: client.PLAIN,
- Compressor: client.SNAPPY,
- },
- {
- Measurement: "status",
- DataType: client.BOOLEAN,
- Encoding: client.RLE,
- Compressor: client.SNAPPY,
- },
- }, rowCount)
-
- if err != nil {
- return nil, err
- }
- ts := time.Now().UTC().UnixNano() / 1000000
- for row := 0; row < int(rowCount); row++ {
- ts++
- tablet.SetTimestamp(ts, row)
- tablet.SetValueAt(rand.Int31(), 0, row)
- tablet.SetValueAt(rand.Float64(), 1, row)
- tablet.SetValueAt(rand.Int63(), 2, row)
- tablet.SetValueAt(rand.Float32(), 3, row)
- tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
- tablet.SetValueAt(bool(ts%2 == 0), 5, row)
- }
- return tablet, nil
-}
-
-func (s *e2eTestSuite) Test_InsertAlignedTablets() {
- var timeseries = []string{"root.ln.device1.**"}
- s.session.DeleteTimeseries(timeseries)
- tablet1, err := createTablet(8)
- if err != nil {
- log.Fatal(err)
- }
- tablet2, err := createTablet(4)
- if err != nil {
- log.Fatal(err)
- }
- tablets := []*client.Tablet{tablet1, tablet2}
- s.checkError(s.session.InsertAlignedTablets(tablets, false))
-
- ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", nil)
- assert := s.Require()
- assert.NoError(err)
- defer ds.Close()
- assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
- assert.Equal(status, "8")
- s.session.DeleteStorageGroup("root.ln.**")
}