You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/09/14 01:15:54 UTC

[iotdb-client-go] 01/01: add create aligned timeseries request (#50)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch 13
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git

commit 82d95e219ff9b5e2a8b4ddcc57731303bb79ca8b
Author: fikers <fi...@gmail.com>
AuthorDate: Tue Sep 13 23:12:21 2022 +0800

    add create aligned timeseries request (#50)
---
 client/session.go          | 47 ++++++++++++++++++++++++++++++++++++++++++++++
 example/session_example.go | 22 ++++++++++++++++++++++
 test/e2e/e2e_test.go       | 34 +++++++++++++++++++++++++++++++++
 3 files changed, 103 insertions(+)

diff --git a/client/session.go b/client/session.go
index 84cf39a..1daf416 100644
--- a/client/session.go
+++ b/client/session.go
@@ -254,6 +254,53 @@ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TS
 	return status, err
 }
 
+/*
+ *create single aligned time series
+ *params
+ *prefixPath: string, time series prefix path (starts from root)
+ *measurements: []string, sensor names
+ *dataTypes: []int32, data types for time series
+ *encodings: []int32, encodings for time series
+ *compressors: []int32, compressing types for time series
+ *measurementAlias: []string, sensor names alias
+ *return
+ *error: correctness of operation
+ */
+func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r *rpc.TSStatus, err error) {
+	destTypes := make([]int32, len(dataTypes))
+	for i, t := range dataTypes {
+		destTypes[i] = int32(t)
+	}
+
+	destEncodings := make([]int32, len(encodings))
+	for i, e := range encodings {
+		destEncodings[i] = int32(e)
+	}
+
+	destCompressions := make([]int32, len(compressors))
+	for i, e := range compressors {
+		destCompressions[i] = int32(e)
+	}
+
+	request := rpc.TSCreateAlignedTimeseriesReq{
+		SessionId:        s.sessionId,
+		PrefixPath:       prefixPath,
+		Measurements:     measurements,
+		DataTypes:        destTypes,
+		Encodings:        destEncodings,
+		Compressors:      destCompressions,
+		MeasurementAlias: measurementAlias,
+	}
+	status, err := s.client.CreateAlignedTimeseries(context.Background(), &request)
+	if err != nil && status == nil {
+		if s.reconnect() {
+			request.SessionId = s.sessionId
+			status, err = s.client.CreateAlignedTimeseries(context.Background(), &request)
+		}
+	}
+	return status, err
+}
+
 /*
  *create multiple time series
  *params
diff --git a/example/session_example.go b/example/session_example.go
index c10154b..293f371 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -70,6 +70,10 @@ func main() {
 	createMultiTimeseries()
 	deleteTimeseries("root.sg1.dev1.temperature")
 
+	createAlignedTimeseries("root.sg1.dev1", []string{"status", "temperature"}, []string{"sts", "temp"})
+	deleteTimeseries("root.sg1.dev1.status")
+	deleteTimeseries("root.sg1.dev1.temperature")
+
 	insertStringRecord()
 	deleteTimeseries("root.ln.wf02.wt02.hardware")
 
@@ -274,6 +278,24 @@ func createTimeseries(path string) {
 	checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
 }
 
+func createAlignedTimeseries(prefixPath string, measurements, measurementAlias []string) {
+	var (
+		dataTypes = []client.TSDataType{
+			client.FLOAT,
+			client.FLOAT,
+		}
+		encodings = []client.TSEncoding{
+			client.PLAIN,
+			client.PLAIN,
+		}
+		compressors = []client.TSCompressionType{
+			client.LZ4,
+			client.LZ4,
+		}
+	)
+	checkError(session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
+}
+
 func createMultiTimeseries() {
 	var (
 		paths       = []string{"root.sg1.dev1.temperature"}
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 645ac60..09e6913 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -91,6 +91,40 @@ func (s *e2eTestSuite) Test_CreateTimeseries() {
 	assert.Equal(timeseries, "root.tsg1.dev1.status")
 }
 
+func (s *e2eTestSuite) Test_CreateAlignedTimeseries() {
+	var (
+		prefixPath       = "root.tsg1.dev1"
+		measurements     = []string{"status", "temperature"}
+		measurementAlias = []string{"sts", "temp"}
+		dataTypes        = []client.TSDataType{
+			client.FLOAT,
+			client.FLOAT,
+		}
+		encodings = []client.TSEncoding{
+			client.PLAIN,
+			client.PLAIN,
+		}
+		compressors = []client.TSCompressionType{
+			client.LZ4,
+			client.LZ4,
+		}
+	)
+	s.checkError(s.session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
+	for i := range measurements {
+		fullPath := fmt.Sprintf("root.tsg1.dev1.%s", measurements[i])
+		ds, err := s.session.ExecuteQueryStatement(fmt.Sprintf("show timeseries %s", fullPath), nil)
+
+		assert := s.Require()
+
+		assert.NoError(err)
+		defer ds.Close()
+		assert.True(ds.Next())
+		var timeseries string
+		assert.NoError(ds.Scan(&timeseries))
+		assert.Equal(timeseries, fullPath)
+	}
+}
+
 func (s *e2eTestSuite) Test_InsertRecords() {
 	var (
 		deviceId     = []string{"root.tsg1.dev1"}