You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/08 07:38:48 UTC
[iotdb-client-go] branch rel/0.13 updated: [To rel/0.13][IOTDB-4009] Add aligned timeseries APIs for go client (#49)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 9e78007 [To rel/0.13][IOTDB-4009] Add aligned timeseries APIs for go client (#49)
9e78007 is described below
commit 9e78007af5e7a97ae466ec8da82c90c6e247e82a
Author: Summer <43...@users.noreply.github.com>
AuthorDate: Mon Aug 8 15:38:43 2022 +0800
[To rel/0.13][IOTDB-4009] Add aligned timeseries APIs for go client (#49)
---
client/session.go | 145 +++++++++++++++++++++++++++++++++++---
example/session_example.go | 123 ++++++++++++++++++++++++++++++++-
test/e2e/e2e_test.go | 169 +++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 427 insertions(+), 10 deletions(-)
diff --git a/client/session.go b/client/session.go
index 84cf39a..c5914d7 100644
--- a/client/session.go
+++ b/client/session.go
@@ -415,13 +415,14 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD
func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
measurements []string,
types []TSDataType,
- values []interface{}) (*rpc.TSInsertRecordReq, error) {
+ values []interface{},
+ isAligned bool) (*rpc.TSInsertRecordReq, error) {
request := &rpc.TSInsertRecordReq{}
request.SessionId = s.sessionId
request.PrefixPath = deviceId
request.Timestamp = time
request.Measurements = measurements
-
+ request.IsAligned = &isAligned
if bys, err := valuesToBytes(types, values); err == nil {
request.Values = bys
} else {
@@ -431,7 +432,24 @@ func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
}
func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
- request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values)
+ request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false)
+ if err != nil {
+ return nil, err
+ }
+ r, err = s.client.InsertRecord(context.Background(), request)
+
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertRecord(context.Background(), request)
+ }
+ }
+
+ return r, err
+}
+
+func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *rpc.TSStatus, err error) {
+ request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true)
if err != nil {
return nil, err
}
@@ -452,6 +470,7 @@ type deviceData struct {
measurementsSlice [][]string
dataTypesSlice [][]TSDataType
valuesSlice [][]interface{}
+ isAligned bool
}
func (d *deviceData) Len() int {
@@ -515,6 +534,49 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64,
return r, err
}
+func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *rpc.TSStatus, err error) {
+ length := len(timestamps)
+ if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length {
+ return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal")
+ }
+
+ if !sorted {
+ sort.Sort(&deviceData{
+ timestamps: timestamps,
+ measurementsSlice: measurementsSlice,
+ dataTypesSlice: dataTypesSlice,
+ valuesSlice: valuesSlice,
+ })
+ }
+
+ valuesList := make([][]byte, length)
+ for i := 0; i < length; i++ {
+ if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil {
+ return nil, err
+ }
+ }
+ var isAligned = true
+ request := &rpc.TSInsertRecordsOfOneDeviceReq{
+ SessionId: s.sessionId,
+ PrefixPath: deviceId,
+ Timestamps: timestamps,
+ MeasurementsList: measurementsSlice,
+ ValuesList: valuesList,
+ IsAligned: &isAligned,
+ }
+
+ r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+ }
+ }
+
+ return r, err
+}
+
/*
*insert multiple rows of data, records are independent to each other, in other words, there's no relationship
*between those records
@@ -529,7 +591,24 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64,
*/
func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64) (r *rpc.TSStatus, err error) {
- request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps)
+ request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false)
+ if err != nil {
+ return nil, err
+ } else {
+ r, err = s.client.InsertRecords(context.Background(), request)
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertRecords(context.Background(), request)
+ }
+ }
+ return r, err
+ }
+}
+
+func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
+ timestamps []int64) (r *rpc.TSStatus, err error) {
+ request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true)
if err != nil {
return nil, err
} else {
@@ -557,7 +636,29 @@ func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus
}
}
}
- request, err := s.genInsertTabletsReq(tablets)
+ request, err := s.genInsertTabletsReq(tablets, false)
+ if err != nil {
+ return nil, err
+ }
+ r, err = s.client.InsertTablets(context.Background(), request)
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertTablets(context.Background(), request)
+ }
+ }
+ return r, err
+}
+
+func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus, err error) {
+ if !sorted {
+ for _, t := range tablets {
+ if err := t.Sort(); err != nil {
+ return nil, err
+ }
+ }
+ }
+ request, err := s.genInsertTabletsReq(tablets, true)
if err != nil {
return nil, err
}
@@ -630,7 +731,7 @@ func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *Sess
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
}
-func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsReq, error) {
+func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.TSInsertTabletsReq, error) {
var (
length = len(tablets)
deviceIds = make([]string, length)
@@ -662,12 +763,13 @@ func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsRe
ValuesList: valuesList,
TimestampsList: timestampsList,
SizeList: sizeList,
+ IsAligned: &isAligned,
}
return &request, nil
}
func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
- timestamps []int64) (*rpc.TSInsertRecordsReq, error) {
+ timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
length := len(deviceIds)
if length != len(timestamps) || length != len(measurements) || length != len(values) {
return nil, errLength
@@ -677,6 +779,7 @@ func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]strin
PrefixPaths: deviceIds,
MeasurementsList: measurements,
Timestamps: timestamps,
+ IsAligned: &isAligned,
}
v := make([][]byte, length)
for i := 0; i < len(measurements); i++ {
@@ -757,7 +860,30 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, er
return nil, err
}
}
- request, err := s.genTSInsertTabletReq(tablet)
+ request, err := s.genTSInsertTabletReq(tablet, false)
+ if err != nil {
+ return nil, err
+ }
+
+ r, err = s.client.InsertTablet(context.Background(), request)
+
+ if err != nil && r == nil {
+ if s.reconnect() {
+ request.SessionId = s.sessionId
+ r, err = s.client.InsertTablet(context.Background(), request)
+ }
+ }
+
+ return r, err
+}
+
+func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, err error) {
+ if !sorted {
+ if err := tablet.Sort(); err != nil {
+ return nil, err
+ }
+ }
+ request, err := s.genTSInsertTabletReq(tablet, true)
if err != nil {
return nil, err
}
@@ -774,7 +900,7 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, er
return r, err
}
-func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, error) {
+func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) (*rpc.TSInsertTabletReq, error) {
if values, err := tablet.getValuesBytes(); err == nil {
request := &rpc.TSInsertTabletReq{
SessionId: s.sessionId,
@@ -784,6 +910,7 @@ func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq,
Timestamps: tablet.GetTimestampBytes(),
Types: tablet.getDataTypes(),
Size: int32(tablet.rowCount),
+ IsAligned: &isAligned,
}
return request, nil
} else {
diff --git a/example/session_example.go b/example/session_example.go
index c10154b..58d95d3 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -87,7 +87,7 @@ func main() {
} else {
log.Fatal(err)
}
-
+ deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
insertTablets()
deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
@@ -110,6 +110,23 @@ func main() {
//0.12.x and newer
insertRecordsOfOneDevice()
deleteTimeseries("root.sg1.dev0.*")
+
+ insertAlignedRecord()
+ deleteTimeseries("root.al1.dev1.*")
+
+ insertAlignedRecords()
+ deleteTimeseries("root.al1.**")
+
+ insertAlignedRecordsOfOneDevice()
+ deleteTimeseries("root.al1.dev4.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablet()
+ deleteTimeseries("root.ln.device1.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablets()
+ deleteTimeseries("root.ln.device1.*")
}
func printDevice1(sds *client.SessionDataSet) {
@@ -309,6 +326,25 @@ func insertRecord() {
checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
}
+func insertAlignedRecord() {
+ var (
+ deviceId = "root.al1.dev1"
+ measurements = []string{"status"}
+ values = []interface{}{"123"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ timestamp int64 = 12
+ )
+ checkError(session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+}
+
func insertRecords() {
var (
deviceId = []string{"root.sg1.dev1"}
@@ -320,6 +356,25 @@ func insertRecords() {
checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
}
+func insertAlignedRecords() {
+ var (
+ deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
+ measurements = [][]string{{"status"}, {"temperature"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+ values = [][]interface{}{{"33"}, {"44"}}
+ timestamps = []int64{12, 13}
+ )
+ checkError(session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+}
+
func insertRecordsOfOneDevice() {
ts := time.Now().UTC().UnixNano() / 1000000
var (
@@ -341,6 +396,42 @@ func insertRecordsOfOneDevice() {
checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
}
+func insertAlignedRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.al1.dev4"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ checkError(session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ sessionDataSetNew, err := session.ExecuteStatement("select restart_count,tick_count,price,temperature,description,status from root.al1.dev4")
+ if err == nil {
+ printDataSet0(sessionDataSetNew)
+ sessionDataSetNew.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+}
+
func deleteData() {
var (
paths = []string{"root.sg1.dev1.status"}
@@ -359,6 +450,22 @@ func insertTablet() {
}
}
+func insertAlignedTablet() {
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertAlignedTablet(tablet, false)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ var timeout int64 = 1000
+ if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {
+ printDevice1(ds)
+ ds.Close()
+ } else {
+ log.Fatal(err)
+ }
+}
+
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
{
@@ -426,6 +533,20 @@ func insertTablets() {
checkError(session.InsertTablets(tablets, false))
}
+func insertAlignedTablets() {
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ checkError(session.InsertAlignedTablets(tablets, false))
+}
+
func setTimeZone() {
var timeZone = "GMT"
session.SetTimeZone(timeZone)
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 645ac60..171d0d6 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -20,6 +20,9 @@
package e2e
import (
+ "fmt"
+ "log"
+ "math/rand"
"testing"
"time"
@@ -110,3 +113,169 @@ func (s *e2eTestSuite) Test_InsertRecords() {
assert.NoError(ds.Scan(&status))
assert.Equal(status, "Working")
}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecord() {
+ var (
+ deviceId = "root.tsg2.dev1"
+ measurements = []string{"status"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ values = []interface{}{"Working"}
+ timestamp = time.Now().UTC().UnixNano() / 1000000
+ )
+ s.checkError(s.session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+
+ ds, err := s.session.ExecuteQueryStatement("select status from root.tsg2.dev1", nil)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "Working")
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecords() {
+ var (
+ deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
+ measurements = [][]string{{"status"}, {"temperature"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+ values = [][]interface{}{{"33"}, {"44"}}
+ timestamps = []int64{12, 13}
+ )
+ s.checkError(s.session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+ ds, err := s.session.ExecuteQueryStatement("select temperature from root.al1.dev3", nil)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var temperature string
+ assert.NoError(ds.Scan(&temperature))
+ assert.Equal(temperature, "44")
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.al1.dev4"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ s.checkError(s.session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+ ds, err := s.session.ExecuteStatement("select temperature from root.al1.dev4")
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "12.1")
+}
+func (s *e2eTestSuite) Test_InsertAlignedTablet() {
+ var timeseries = []string{"root.ln.device1.**"}
+ s.session.DeleteTimeseries(timeseries)
+ if tablet, err := createTablet(12); err == nil {
+ status, err := s.session.InsertAlignedTablet(tablet, false)
+ s.checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ var timeout int64 = 1000
+ ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", &timeout)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "12")
+ s.session.DeleteStorageGroup("root.ln.**")
+}
+func createTablet(rowCount int) (*client.Tablet, error) {
+ tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
+ {
+ Measurement: "restart_count",
+ DataType: client.INT32,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "price",
+ DataType: client.DOUBLE,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "tick_count",
+ DataType: client.INT64,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "temperature",
+ DataType: client.FLOAT,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "description",
+ DataType: client.TEXT,
+ Encoding: client.PLAIN,
+ Compressor: client.SNAPPY,
+ },
+ {
+ Measurement: "status",
+ DataType: client.BOOLEAN,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ },
+ }, rowCount)
+
+ if err != nil {
+ return nil, err
+ }
+ ts := time.Now().UTC().UnixNano() / 1000000
+ for row := 0; row < int(rowCount); row++ {
+ ts++
+ tablet.SetTimestamp(ts, row)
+ tablet.SetValueAt(rand.Int31(), 0, row)
+ tablet.SetValueAt(rand.Float64(), 1, row)
+ tablet.SetValueAt(rand.Int63(), 2, row)
+ tablet.SetValueAt(rand.Float32(), 3, row)
+ tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+ tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+ }
+ return tablet, nil
+}
+
+func (s *e2eTestSuite) Test_InsertAlignedTablets() {
+ var timeseries = []string{"root.ln.device1.**"}
+ s.session.DeleteTimeseries(timeseries)
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ s.checkError(s.session.InsertAlignedTablets(tablets, false))
+
+ ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", nil)
+ assert := s.Require()
+ assert.NoError(err)
+ defer ds.Close()
+ assert.True(ds.Next())
+ var status string
+ assert.NoError(ds.Scan(&status))
+ assert.Equal(status, "8")
+ s.session.DeleteStorageGroup("root.ln.**")
+}