You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/09 07:25:52 UTC
[iotdb-client-go] branch main updated: adds sessionPool (#79)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6816748 adds sessionPool (#79)
6816748 is described below
commit 68167480e3591f39cc2129ea4b35855be68b9755
Author: Liwen Fu <fu...@gmail.com>
AuthorDate: Thu Mar 9 15:25:48 2023 +0800
adds sessionPool (#79)
Co-authored-by: fuliwen <fu...@yonyou.com>
---
README.md | 68 +++
README_ZH.md | 69 +++
client/session.go | 36 +-
client/sessionpool.go | 149 +++++
example/session_pool/session_pool_example.go | 784 +++++++++++++++++++++++++++
5 files changed, 1089 insertions(+), 17 deletions(-)
diff --git a/README.md b/README.md
index 0a97fba..5466cf0 100644
--- a/README.md
+++ b/README.md
@@ -79,6 +79,74 @@ curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main
go run session_example.go
```
+## How to Use the SessionPool
+SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider how to reuse a session connection.
+If there is no available connections and the pool reaches its max size, the all methods will hang until there is a available connection.
+The PutBack method must be called after use
+
+### New sessionPool
+standalone
+
+```golang
+
+config := &client.PoolConfig{
+ Host: host,
+ Port: port,
+ UserName: user,
+ Password: password,
+}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+cluster or doubleLive
+
+```golang
+
+config := &client.PoolConfig{
+ UserName: user,
+ Password: password,
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+ }
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+### Get session through sessionPool, putback after use
+
+set storage group
+
+```golang
+
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err == nil {
+ session.SetStorageGroup(sg)
+}
+
+```
+
+query statement
+
+```golang
+
+var timeout int64 = 1000
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err != nil {
+ log.Print(err)
+ return
+}
+sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+if err == nil {
+ defer sessionDataSet.Close()
+ printDataSet1(sessionDataSet)
+} else {
+ log.Println(err)
+}
+
+```
+
+
## Developer environment requirements for iotdb-client-go
### OS
diff --git a/README_ZH.md b/README_ZH.md
index 1c17ae7..33bd0db 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -76,6 +76,75 @@ curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main
go run session_example.go
```
+## SessionPool
+通过SessionPool管理session,用户不需要考虑如何重用session,当到达pool的最大值时,获取session的请求会阻塞
+注意:session使用完成后需要调用PutBack方法
+
+### 创建sessionPool
+
+单实例
+```golang
+
+config := &client.PoolConfig{
+ Host: host,
+ Port: port,
+ UserName: user,
+ Password: password,
+}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+分布式或双活
+
+```golang
+
+config := &client.PoolConfig{
+ UserName: user,
+ Password: password,
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+ }
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+
+### 使用sessionPool获取session,使用完手动调用PutBack
+
+例1:设置存储组
+
+```golang
+
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err == nil {
+ session.SetStorageGroup(sg)
+}
+
+```
+
+例2:查询
+
+```golang
+
+var timeout int64 = 1000
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err != nil {
+ log.Print(err)
+ return
+}
+sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+if err == nil {
+ defer sessionDataSet.Close()
+ printDataSet1(sessionDataSet)
+} else {
+ log.Println(err)
+}
+
+```
+
+
## iotdb-client-go的开发者环境要求
### 操作系统
diff --git a/client/session.go b/client/session.go
index 17a3c9e..1851d91 100644
--- a/client/session.go
+++ b/client/session.go
@@ -121,11 +121,12 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
}
type ClusterConfig struct {
- NodeUrls []string //ip:port
- UserName string
- Password string
- FetchSize int32
- TimeZone string
+ NodeUrls []string //ip:port
+ UserName string
+ Password string
+ FetchSize int32
+ TimeZone string
+ ConnectRetryMax int
}
type ClusterSession struct {
@@ -997,12 +998,12 @@ func NewSession(config *Config) Session {
return Session{config: config}
}
-func NewClusterSession(ClusterConfig *ClusterConfig) Session {
+func NewClusterSession(clusterConfig *ClusterConfig) Session {
session := Session{}
node := endPoint{}
- for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
- node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
- node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
+ for i := 0; i < len(clusterConfig.NodeUrls); i++ {
+ node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
+ node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
endPointList.PushBack(node)
}
var err error
@@ -1020,7 +1021,7 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session {
log.Println(err)
} else {
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
- ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
+ clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
break
}
}
@@ -1080,14 +1081,15 @@ func (s *Session) initClusterConn(node endPoint) error {
}
-func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
+func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config {
return &Config{
- Host: host,
- Port: port,
- UserName: userName,
- Password: passWord,
- FetchSize: fetchSize,
- TimeZone: timeZone,
+ Host: host,
+ Port: port,
+ UserName: userName,
+ Password: passWord,
+ FetchSize: fetchSize,
+ TimeZone: timeZone,
+ ConnectRetryMax: connectRetryMax,
}
}
diff --git a/client/sessionpool.go b/client/sessionpool.go
new file mode 100644
index 0000000..156ce2a
--- /dev/null
+++ b/client/sessionpool.go
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "errors"
+ "log"
+ "runtime"
+ "time"
+)
+
+var errTimeout = errors.New("get session timeout")
+var errPoolClosed = errors.New("sessionPool has closed")
+var defaultMultiple = 5
+
+type SessionPool struct {
+ config *PoolConfig
+ maxSize int
+ waitToGetSessionTimeoutInMs int
+ enableCompression bool
+ connectionTimeoutInMs int
+ ch chan Session
+ sem chan int8
+}
+
+type PoolConfig struct {
+ Host string
+ Port string
+ NodeUrls []string
+ UserName string
+ Password string
+ FetchSize int32
+ TimeZone string
+ ConnectRetryMax int
+}
+
+func NewSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int,
+ enableCompression bool) SessionPool {
+ if maxSize <= 0 {
+ maxSize = runtime.NumCPU() * defaultMultiple
+ }
+ var ch = make(chan Session, maxSize)
+ var sem = make(chan int8, maxSize)
+ return SessionPool{
+ config: conf,
+ maxSize: maxSize,
+ waitToGetSessionTimeoutInMs: waitToGetSessionTimeoutInMs,
+ connectionTimeoutInMs: connectionTimeoutInMs,
+ enableCompression: enableCompression,
+ ch: ch,
+ sem: sem,
+ }
+}
+
+func (spool *SessionPool) GetSession() (session Session, err error) {
+ for {
+ select {
+ case spool.sem <- 1:
+ select {
+ case session, ok := <-spool.ch:
+ if ok {
+ return session, nil
+ } else {
+ log.Println("sessionPool has closed")
+ return session, errPoolClosed
+ }
+ default:
+ config := spool.config
+ session, err := spool.ConstructSession(config)
+ return session, err
+ }
+ case <-time.After(time.Millisecond * time.Duration(spool.waitToGetSessionTimeoutInMs)):
+ log.Println("get session timeout")
+ return session, errTimeout
+ }
+ }
+}
+
+func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
+ if len(config.NodeUrls) > 0 {
+ session = NewClusterSession(getClusterSessionConfig(config))
+ if err := session.OpenCluster(spool.enableCompression); err != nil {
+ log.Print(err)
+ return session, err
+ }
+ } else {
+ session = NewSession(getSessionConfig(config))
+ if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
+ log.Print(err)
+ return session, err
+ }
+ }
+ return session, nil
+}
+
+func getSessionConfig(config *PoolConfig) *Config {
+ return &Config{
+ Host: config.Host,
+ Port: config.Port,
+ UserName: config.UserName,
+ Password: config.Password,
+ FetchSize: config.FetchSize,
+ TimeZone: config.TimeZone,
+ ConnectRetryMax: config.ConnectRetryMax,
+ }
+}
+
+func getClusterSessionConfig(config *PoolConfig) *ClusterConfig {
+ return &ClusterConfig{
+ NodeUrls: config.NodeUrls,
+ UserName: config.UserName,
+ Password: config.Password,
+ FetchSize: config.FetchSize,
+ TimeZone: config.TimeZone,
+ ConnectRetryMax: config.ConnectRetryMax,
+ }
+}
+
+func (spool *SessionPool) PutBack(session Session) {
+ if session.trans.IsOpen() {
+ spool.ch <- session
+ }
+ <-spool.sem
+}
+
+func (spool *SessionPool) Close() {
+ close(spool.ch)
+ for s := range spool.ch {
+ s.Close()
+ }
+ close(spool.sem)
+}
diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go
new file mode 100644
index 0000000..c43a4fb
--- /dev/null
+++ b/example/session_pool/session_pool_example.go
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/iotdb-client-go/client"
+ "github.com/apache/iotdb-client-go/rpc"
+)
+
+var (
+ host string
+ port string
+ user string
+ password string
+)
+var sessionPool client.SessionPool
+
+func main() {
+ flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
+ flag.StringVar(&port, "port", "6667", "--port=6667")
+ flag.StringVar(&user, "user", "root", "--user=root")
+ flag.StringVar(&password, "password", "root", "--password=root")
+ flag.Parse()
+ config := &client.PoolConfig{
+ Host: host,
+ Port: port,
+ UserName: user,
+ Password: password,
+ }
+ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+ defer sessionPool.Close()
+ var wg sync.WaitGroup
+ for i := 0; i < 10000; i++ {
+ var j = i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ setStorageGroup(fmt.Sprintf("root.ln-%d", j))
+ deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))
+
+ }()
+
+ }
+ setStorageGroup("root.ln1")
+ setStorageGroup("root.ln2")
+ deleteStorageGroups("root.ln1", "root.ln2")
+
+ createTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.sg1.dev1.status")
+
+ createMultiTimeseries()
+ deleteTimeseries("root.sg1.dev1.temperature")
+
+ createAlignedTimeseries("root.sg1.dev1", []string{"status", "temperature"}, []string{"sts", "temp"})
+ deleteTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.sg1.dev1.temperature")
+
+ insertStringRecord()
+ deleteTimeseries("root.ln.wf02.wt02.hardware")
+
+ insertRecord()
+ deleteTimeseries("root.sg1.dev1.status")
+
+ insertRecords()
+ deleteTimeseries("root.sg1.dev1.status")
+
+ insertTablet()
+
+ deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
+ insertTablets()
+ deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
+
+ insertRecord()
+ deleteData()
+
+ setTimeZone()
+ if tz, err := getTimeZone(); err == nil {
+ fmt.Printf("TimeZone: %s\n", tz)
+ } else {
+ fmt.Printf("getTimeZone ERROR: %v\n", err)
+ }
+
+ executeStatement()
+ executeQueryStatement("select count(s3) from root.sg1.dev1")
+ executeRawDataQuery()
+ executeBatchStatement()
+
+ deleteTimeseries("root.sg1.dev1.status")
+ deleteTimeseries("root.ln.wf02.wt02.s5")
+
+ //0.12.x and newer
+ insertRecordsOfOneDevice()
+ deleteTimeseries("root.sg1.dev0.*")
+
+ insertAlignedRecord()
+ deleteTimeseries("root.al1.dev1.*")
+
+ insertAlignedRecords()
+ deleteTimeseries("root.al1.**")
+
+ insertAlignedRecordsOfOneDevice()
+ deleteTimeseries("root.al1.dev4.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablet()
+ deleteTimeseries("root.ln.device1.*")
+
+ deleteStorageGroup("root.ln")
+ insertAlignedTablets()
+ deleteTimeseries("root.ln.device1.*")
+ executeQueryStatement("show timeseries root.**")
+ wg.Wait()
+
+}
+
+func setStorageGroup(sg string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ session.SetStorageGroup(sg)
+ }
+}
+
+func deleteStorageGroup(sg string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteStorageGroup(sg))
+ }
+}
+
+func deleteStorageGroups(sgs ...string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteStorageGroups(sgs...))
+ }
+}
+
+func createTimeseries(path string) {
+ var (
+ dataType = client.FLOAT
+ encoding = client.PLAIN
+ compressor = client.SNAPPY
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
+ }
+}
+
+func createAlignedTimeseries(prefixPath string, measurements, measurementAlias []string) {
+ var (
+ dataTypes = []client.TSDataType{
+ client.FLOAT,
+ client.FLOAT,
+ }
+ encodings = []client.TSEncoding{
+ client.PLAIN,
+ client.PLAIN,
+ }
+ compressors = []client.TSCompressionType{
+ client.LZ4,
+ client.LZ4,
+ }
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
+ }
+
+}
+
+func createMultiTimeseries() {
+ var (
+ paths = []string{"root.sg1.dev1.temperature"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ encodings = []client.TSEncoding{client.PLAIN}
+ compressors = []client.TSCompressionType{client.SNAPPY}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.CreateMultiTimeseries(paths, dataTypes, encodings, compressors))
+ }
+
+}
+
+func deleteTimeseries(paths ...string) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteTimeseries(paths))
+ }
+
+}
+
+func insertStringRecord() {
+ var (
+ deviceId = "root.ln.wf02.wt02"
+ measurements = []string{"hardware"}
+ values = []string{"123"}
+ timestamp int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertStringRecord(deviceId, measurements, values, timestamp))
+ }
+
+}
+
+func insertRecord() {
+ var (
+ deviceId = "root.sg1.dev1"
+ measurements = []string{"status"}
+ values = []interface{}{"123"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ timestamp int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
+ }
+
+}
+
+func insertAlignedRecord() {
+ var (
+ deviceId = "root.al1.dev1"
+ measurements = []string{"status"}
+ values = []interface{}{"123"}
+ dataTypes = []client.TSDataType{client.TEXT}
+ timestamp int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+ }
+
+}
+
+func insertRecords() {
+ var (
+ deviceId = []string{"root.sg1.dev1"}
+ measurements = [][]string{{"status"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}}
+ values = [][]interface{}{{"123"}}
+ timestamp = []int64{12}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
+ }
+
+}
+
+func insertAlignedRecords() {
+ var (
+ deviceIds = []string{"root.al1.dev2", "root.al1.dev3"}
+ measurements = [][]string{{"status"}, {"temperature"}}
+ dataTypes = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+ values = [][]interface{}{{"33"}, {"44"}}
+ timestamps = []int64{12, 13}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+ }
+
+}
+
+func insertRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.sg1.dev0"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+ }
+
+}
+
+func insertAlignedRecordsOfOneDevice() {
+ ts := time.Now().UTC().UnixNano() / 1000000
+ var (
+ deviceId = "root.al1.dev4"
+ measurementsSlice = [][]string{
+ {"restart_count", "tick_count", "price"},
+ {"temperature", "description", "status"},
+ }
+ dataTypes = [][]client.TSDataType{
+ {client.INT32, client.INT64, client.DOUBLE},
+ {client.FLOAT, client.TEXT, client.BOOLEAN},
+ }
+ values = [][]interface{}{
+ {int32(1), int64(2018), float64(1988.1)},
+ {float32(12.1), "Test Device 1", false},
+ }
+ timestamps = []int64{ts, ts - 1}
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+ sessionDataSet, err := session.ExecuteStatement("show devices")
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+ sessionDataSetNew, err := session.ExecuteStatement("select restart_count,tick_count,price,temperature,description,status from root.al1.dev4")
+ if err == nil {
+ printDataSet0(sessionDataSetNew)
+ sessionDataSetNew.Close()
+ } else {
+ log.Println(err)
+ }
+ fmt.Println()
+ }
+
+}
+
+func deleteData() {
+ var (
+ paths = []string{"root.sg1.dev1.status"}
+ startTime int64 = 0
+ endTime int64 = 12
+ )
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.DeleteData(paths, startTime, endTime))
+ }
+
+}
+
+func insertTablet() {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertTablet(tablet, false)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ }
+
+}
+
+func insertAlignedTablet() {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ if tablet, err := createTablet(12); err == nil {
+ status, err := session.InsertAlignedTablet(tablet, false)
+ checkError(status, err)
+ } else {
+ log.Fatal(err)
+ }
+ var timeout int64 = 1000
+ if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {
+ printDevice1(ds)
+ ds.Close()
+ } else {
+ log.Fatal(err)
+ }
+ }
+
+}
+
+func createTablet(rowCount int) (*client.Tablet, error) {
+ tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
+ {
+ Measurement: "restart_count",
+ DataType: client.INT32,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "price",
+ DataType: client.DOUBLE,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "tick_count",
+ DataType: client.INT64,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "temperature",
+ DataType: client.FLOAT,
+ Encoding: client.GORILLA,
+ Compressor: client.SNAPPY,
+ }, {
+ Measurement: "description",
+ DataType: client.TEXT,
+ Encoding: client.PLAIN,
+ Compressor: client.SNAPPY,
+ },
+ {
+ Measurement: "status",
+ DataType: client.BOOLEAN,
+ Encoding: client.RLE,
+ Compressor: client.SNAPPY,
+ },
+ }, rowCount)
+
+ if err != nil {
+ return nil, err
+ }
+ ts := time.Now().UTC().UnixNano() / 1000000
+ for row := 0; row < int(rowCount); row++ {
+ ts++
+ tablet.SetTimestamp(ts, row)
+ tablet.SetValueAt(rand.Int31(), 0, row)
+ tablet.SetValueAt(rand.Float64(), 1, row)
+ tablet.SetValueAt(rand.Int63(), 2, row)
+ tablet.SetValueAt(rand.Float32(), 3, row)
+ tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+ tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+ }
+ return tablet, nil
+}
+
+func insertTablets() {
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertTablets(tablets, false))
+ }
+
+}
+
+func insertAlignedTablets() {
+ tablet1, err := createTablet(8)
+ if err != nil {
+ log.Fatal(err)
+ }
+ tablet2, err := createTablet(4)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1, tablet2}
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.InsertAlignedTablets(tablets, false))
+ }
+
+}
+
+func setTimeZone() {
+ var timeZone = "GMT"
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ session.SetTimeZone(timeZone)
+ }
+
+}
+
+func getTimeZone() (string, error) {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ return session.GetTimeZone()
+ }
+ return "", err
+}
+
+func executeQueryStatement(sql string) {
+ var timeout int64 = 1000
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+ if err == nil {
+ defer sessionDataSet.Close()
+ printDataSet1(sessionDataSet)
+ } else {
+ log.Println(err)
+ }
+}
+
+func executeStatement() {
+ var sql = "show storage group"
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ sessionDataSet, err := session.ExecuteStatement(sql)
+ if err == nil {
+ printDataSet0(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+}
+
+func executeRawDataQuery() {
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)")
+ var (
+ paths []string = []string{"root.ln.wf02.wt02.s5"}
+ startTime int64 = 1
+ endTime int64 = 200
+ )
+ sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
+ if err == nil {
+ printDataSet2(sessionDataSet)
+ sessionDataSet.Close()
+ } else {
+ log.Println(err)
+ }
+}
+
+func executeBatchStatement() {
+ var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
+ "insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err == nil {
+ checkError(session.ExecuteBatchStatement(sqls))
+ }
+
+}
+
+func printDevice1(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for _, columnName := range sds.GetColumnNames() {
+ fmt.Printf("%s\t", columnName)
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+ }
+
+ var restartCount int32
+ var price float64
+ var tickCount int64
+ var temperature float32
+ var description string
+ var status bool
+
+ // All of iotdb datatypes can be scan into string variables
+ // var restartCount string
+ // var price string
+ // var tickCount string
+ // var temperature string
+ // var description string
+ // var status string
+
+ if err := sds.Scan(&restartCount, &price, &tickCount, &temperature, &description, &status); err != nil {
+ log.Fatal(err)
+ }
+
+ whitespace := "\t\t"
+ fmt.Printf("%v%s", restartCount, whitespace)
+ fmt.Printf("%v%s", price, whitespace)
+ fmt.Printf("%v%s", tickCount, whitespace)
+ fmt.Printf("%v%s", temperature, whitespace)
+ fmt.Printf("%v%s", description, whitespace)
+ fmt.Printf("%v%s", status, whitespace)
+
+ fmt.Println()
+ }
+}
+
+func printDataSet0(sessionDataSet *client.SessionDataSet) {
+ showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName))
+ }
+ for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+ columnName := sessionDataSet.GetColumnName(i)
+ switch sessionDataSet.GetColumnDataType(i) {
+ case client.BOOLEAN:
+ fmt.Print(sessionDataSet.GetBool(columnName))
+ case client.INT32:
+ fmt.Print(sessionDataSet.GetInt32(columnName))
+ case client.INT64:
+ fmt.Print(sessionDataSet.GetInt64(columnName))
+ case client.FLOAT:
+ fmt.Print(sessionDataSet.GetFloat(columnName))
+ case client.DOUBLE:
+ fmt.Print(sessionDataSet.GetDouble(columnName))
+ case client.TEXT:
+ fmt.Print(sessionDataSet.GetText(columnName))
+ default:
+ }
+ fmt.Print("\t\t")
+ }
+ fmt.Println()
+ }
+}
+
+func printDataSet1(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sds.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+ }
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ columnName := sds.GetColumnName(i)
+ v := sds.GetValue(columnName)
+ if v == nil {
+ v = "null"
+ }
+ fmt.Printf("%v\t\t", v)
+ }
+ fmt.Println()
+ }
+}
+
+func printDataSet2(sds *client.SessionDataSet) {
+ showTimestamp := !sds.IsIgnoreTimeStamp()
+ if showTimestamp {
+ fmt.Print("Time\t\t\t\t")
+ }
+
+ for i := 0; i < sds.GetColumnCount(); i++ {
+ fmt.Printf("%s\t", sds.GetColumnName(i))
+ }
+ fmt.Println()
+
+ for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+ if showTimestamp {
+ fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+ }
+
+ if record, err := sds.GetRowRecord(); err == nil {
+ for _, field := range record.GetFields() {
+ v := field.GetValue()
+ if field.IsNull() {
+ v = "null"
+ }
+ fmt.Printf("%v\t\t", v)
+ }
+ fmt.Println()
+ }
+ }
+}
+
+func checkError(status *rpc.TSStatus, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ if status != nil {
+ if err = client.VerifySuccess(status); err != nil {
+ log.Println(err)
+ }
+ }
+}
+
+// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
+func useSessionPool() {
+
+ config := &client.PoolConfig{
+ UserName: user,
+ Password: password,
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+ }
+ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+ defer sessionPool.Close()
+ session, err := sessionPool.GetSession()
+ defer sessionPool.PutBack(session)
+ if err != nil {
+ log.Print(err)
+ return
+ }
+
+}