You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/09 07:25:52 UTC

[iotdb-client-go] branch main updated: adds sessionPool (#79)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 6816748  adds sessionPool (#79)
6816748 is described below

commit 68167480e3591f39cc2129ea4b35855be68b9755
Author: Liwen Fu <fu...@gmail.com>
AuthorDate: Thu Mar 9 15:25:48 2023 +0800

    adds sessionPool (#79)
    
    Co-authored-by: fuliwen <fu...@yonyou.com>
---
 README.md                                    |  68 +++
 README_ZH.md                                 |  69 +++
 client/session.go                            |  36 +-
 client/sessionpool.go                        | 149 +++++
 example/session_pool/session_pool_example.go | 784 +++++++++++++++++++++++++++
 5 files changed, 1089 insertions(+), 17 deletions(-)

diff --git a/README.md b/README.md
index 0a97fba..5466cf0 100644
--- a/README.md
+++ b/README.md
@@ -79,6 +79,74 @@ curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main
 go run session_example.go
 ```
 
+## How to Use the SessionPool
+SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider how to reuse a session connection.
+If there is no available connections and the pool reaches its max size, the all methods will hang until there is a available connection.
+The PutBack method must be called after use
+
+### New sessionPool
+standalone
+
+```golang
+
+config := &client.PoolConfig{
+    Host:     host,
+    Port:     port,
+    UserName: user,
+    Password: password,
+}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+cluster or doubleLive
+
+```golang
+
+config := &client.PoolConfig{
+		UserName: user,
+		Password: password,
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+	}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+### Get session through sessionPool, putback after use
+
+set storage group
+
+```golang
+
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err == nil {
+    session.SetStorageGroup(sg)
+}
+
+```
+
+query statement
+
+```golang
+
+var timeout int64 = 1000
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err != nil {
+    log.Print(err)
+    return
+}
+sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+if err == nil {
+    defer sessionDataSet.Close()
+    printDataSet1(sessionDataSet)
+} else {
+    log.Println(err)
+}
+
+```
+
+
 ## Developer environment requirements for iotdb-client-go
 
 ### OS
diff --git a/README_ZH.md b/README_ZH.md
index 1c17ae7..33bd0db 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -76,6 +76,75 @@ curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main
 go run session_example.go
 ```
 
+## SessionPool
+通过SessionPool管理session,用户不需要考虑如何重用session,当到达pool的最大值时,获取session的请求会阻塞
+注意:session使用完成后需要调用PutBack方法
+
+### 创建sessionPool
+
+单实例
+```golang
+
+config := &client.PoolConfig{
+    Host:     host,
+    Port:     port,
+    UserName: user,
+    Password: password,
+}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+分布式或双活
+
+```golang
+
+config := &client.PoolConfig{
+		UserName: user,
+		Password: password,
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+	}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+
+### 使用sessionPool获取session,使用完手动调用PutBack
+
+例1:设置存储组
+
+```golang
+
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err == nil {
+    session.SetStorageGroup(sg)
+}
+
+```
+
+例2:查询
+
+```golang
+
+var timeout int64 = 1000
+session, err := sessionPool.GetSession()
+defer sessionPool.PutBack(session)
+if err != nil {
+    log.Print(err)
+    return
+}
+sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+if err == nil {
+    defer sessionDataSet.Close()
+    printDataSet1(sessionDataSet)
+} else {
+    log.Println(err)
+}
+
+```
+
+
 ## iotdb-client-go的开发者环境要求
 
 ### 操作系统
diff --git a/client/session.go b/client/session.go
index 17a3c9e..1851d91 100644
--- a/client/session.go
+++ b/client/session.go
@@ -121,11 +121,12 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
 }
 
 type ClusterConfig struct {
-	NodeUrls  []string //ip:port
-	UserName  string
-	Password  string
-	FetchSize int32
-	TimeZone  string
+	NodeUrls        []string //ip:port
+	UserName        string
+	Password        string
+	FetchSize       int32
+	TimeZone        string
+	ConnectRetryMax int
 }
 
 type ClusterSession struct {
@@ -997,12 +998,12 @@ func NewSession(config *Config) Session {
 	return Session{config: config}
 }
 
-func NewClusterSession(ClusterConfig *ClusterConfig) Session {
+func NewClusterSession(clusterConfig *ClusterConfig) Session {
 	session := Session{}
 	node := endPoint{}
-	for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
-		node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
-		node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
+	for i := 0; i < len(clusterConfig.NodeUrls); i++ {
+		node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
+		node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
 		endPointList.PushBack(node)
 	}
 	var err error
@@ -1020,7 +1021,7 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session {
 					log.Println(err)
 				} else {
 					session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
-						ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
+						clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
 					break
 				}
 			}
@@ -1080,14 +1081,15 @@ func (s *Session) initClusterConn(node endPoint) error {
 
 }
 
-func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
+func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config {
 	return &Config{
-		Host:      host,
-		Port:      port,
-		UserName:  userName,
-		Password:  passWord,
-		FetchSize: fetchSize,
-		TimeZone:  timeZone,
+		Host:            host,
+		Port:            port,
+		UserName:        userName,
+		Password:        passWord,
+		FetchSize:       fetchSize,
+		TimeZone:        timeZone,
+		ConnectRetryMax: connectRetryMax,
 	}
 }
 
diff --git a/client/sessionpool.go b/client/sessionpool.go
new file mode 100644
index 0000000..156ce2a
--- /dev/null
+++ b/client/sessionpool.go
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+	"errors"
+	"log"
+	"runtime"
+	"time"
+)
+
+var errTimeout = errors.New("get session timeout")
+var errPoolClosed = errors.New("sessionPool has closed")
+var defaultMultiple = 5
+
+type SessionPool struct {
+	config                      *PoolConfig
+	maxSize                     int
+	waitToGetSessionTimeoutInMs int
+	enableCompression           bool
+	connectionTimeoutInMs       int
+	ch                          chan Session
+	sem                         chan int8
+}
+
+type PoolConfig struct {
+	Host            string
+	Port            string
+	NodeUrls        []string
+	UserName        string
+	Password        string
+	FetchSize       int32
+	TimeZone        string
+	ConnectRetryMax int
+}
+
+func NewSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int,
+	enableCompression bool) SessionPool {
+	if maxSize <= 0 {
+		maxSize = runtime.NumCPU() * defaultMultiple
+	}
+	var ch = make(chan Session, maxSize)
+	var sem = make(chan int8, maxSize)
+	return SessionPool{
+		config:                      conf,
+		maxSize:                     maxSize,
+		waitToGetSessionTimeoutInMs: waitToGetSessionTimeoutInMs,
+		connectionTimeoutInMs:       connectionTimeoutInMs,
+		enableCompression:           enableCompression,
+		ch:                          ch,
+		sem:                         sem,
+	}
+}
+
+func (spool *SessionPool) GetSession() (session Session, err error) {
+	for {
+		select {
+		case spool.sem <- 1:
+			select {
+			case session, ok := <-spool.ch:
+				if ok {
+					return session, nil
+				} else {
+					log.Println("sessionPool has closed")
+					return session, errPoolClosed
+				}
+			default:
+				config := spool.config
+				session, err := spool.ConstructSession(config)
+				return session, err
+			}
+		case <-time.After(time.Millisecond * time.Duration(spool.waitToGetSessionTimeoutInMs)):
+			log.Println("get session timeout")
+			return session, errTimeout
+		}
+	}
+}
+
+func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
+	if len(config.NodeUrls) > 0 {
+		session = NewClusterSession(getClusterSessionConfig(config))
+		if err := session.OpenCluster(spool.enableCompression); err != nil {
+			log.Print(err)
+			return session, err
+		}
+	} else {
+		session = NewSession(getSessionConfig(config))
+		if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
+			log.Print(err)
+			return session, err
+		}
+	}
+	return session, nil
+}
+
+func getSessionConfig(config *PoolConfig) *Config {
+	return &Config{
+		Host:            config.Host,
+		Port:            config.Port,
+		UserName:        config.UserName,
+		Password:        config.Password,
+		FetchSize:       config.FetchSize,
+		TimeZone:        config.TimeZone,
+		ConnectRetryMax: config.ConnectRetryMax,
+	}
+}
+
+func getClusterSessionConfig(config *PoolConfig) *ClusterConfig {
+	return &ClusterConfig{
+		NodeUrls:        config.NodeUrls,
+		UserName:        config.UserName,
+		Password:        config.Password,
+		FetchSize:       config.FetchSize,
+		TimeZone:        config.TimeZone,
+		ConnectRetryMax: config.ConnectRetryMax,
+	}
+}
+
+func (spool *SessionPool) PutBack(session Session) {
+	if session.trans.IsOpen() {
+		spool.ch <- session
+	}
+	<-spool.sem
+}
+
+func (spool *SessionPool) Close() {
+	close(spool.ch)
+	for s := range spool.ch {
+		s.Close()
+	}
+	close(spool.sem)
+}
diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go
new file mode 100644
index 0000000..c43a4fb
--- /dev/null
+++ b/example/session_pool/session_pool_example.go
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"math/rand"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/apache/iotdb-client-go/client"
+	"github.com/apache/iotdb-client-go/rpc"
+)
+
+var (
+	host     string
+	port     string
+	user     string
+	password string
+)
+var sessionPool client.SessionPool
+
+func main() {
+	flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
+	flag.StringVar(&port, "port", "6667", "--port=6667")
+	flag.StringVar(&user, "user", "root", "--user=root")
+	flag.StringVar(&password, "password", "root", "--password=root")
+	flag.Parse()
+	config := &client.PoolConfig{
+		Host:     host,
+		Port:     port,
+		UserName: user,
+		Password: password,
+	}
+	sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+	defer sessionPool.Close()
+	var wg sync.WaitGroup
+	for i := 0; i < 10000; i++ {
+		var j = i
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			setStorageGroup(fmt.Sprintf("root.ln-%d", j))
+			deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))
+
+		}()
+
+	}
+	setStorageGroup("root.ln1")
+	setStorageGroup("root.ln2")
+	deleteStorageGroups("root.ln1", "root.ln2")
+
+	createTimeseries("root.sg1.dev1.status")
+	deleteTimeseries("root.sg1.dev1.status")
+
+	createMultiTimeseries()
+	deleteTimeseries("root.sg1.dev1.temperature")
+
+	createAlignedTimeseries("root.sg1.dev1", []string{"status", "temperature"}, []string{"sts", "temp"})
+	deleteTimeseries("root.sg1.dev1.status")
+	deleteTimeseries("root.sg1.dev1.temperature")
+
+	insertStringRecord()
+	deleteTimeseries("root.ln.wf02.wt02.hardware")
+
+	insertRecord()
+	deleteTimeseries("root.sg1.dev1.status")
+
+	insertRecords()
+	deleteTimeseries("root.sg1.dev1.status")
+
+	insertTablet()
+
+	deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
+	insertTablets()
+	deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
+
+	insertRecord()
+	deleteData()
+
+	setTimeZone()
+	if tz, err := getTimeZone(); err == nil {
+		fmt.Printf("TimeZone: %s\n", tz)
+	} else {
+		fmt.Printf("getTimeZone ERROR: %v\n", err)
+	}
+
+	executeStatement()
+	executeQueryStatement("select count(s3) from root.sg1.dev1")
+	executeRawDataQuery()
+	executeBatchStatement()
+
+	deleteTimeseries("root.sg1.dev1.status")
+	deleteTimeseries("root.ln.wf02.wt02.s5")
+
+	//0.12.x and newer
+	insertRecordsOfOneDevice()
+	deleteTimeseries("root.sg1.dev0.*")
+
+	insertAlignedRecord()
+	deleteTimeseries("root.al1.dev1.*")
+
+	insertAlignedRecords()
+	deleteTimeseries("root.al1.**")
+
+	insertAlignedRecordsOfOneDevice()
+	deleteTimeseries("root.al1.dev4.*")
+
+	deleteStorageGroup("root.ln")
+	insertAlignedTablet()
+	deleteTimeseries("root.ln.device1.*")
+
+	deleteStorageGroup("root.ln")
+	insertAlignedTablets()
+	deleteTimeseries("root.ln.device1.*")
+	executeQueryStatement("show timeseries root.**")
+	wg.Wait()
+
+}
+
+func setStorageGroup(sg string) {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		session.SetStorageGroup(sg)
+	}
+}
+
+func deleteStorageGroup(sg string) {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.DeleteStorageGroup(sg))
+	}
+}
+
+func deleteStorageGroups(sgs ...string) {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.DeleteStorageGroups(sgs...))
+	}
+}
+
+func createTimeseries(path string) {
+	var (
+		dataType   = client.FLOAT
+		encoding   = client.PLAIN
+		compressor = client.SNAPPY
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
+	}
+}
+
+func createAlignedTimeseries(prefixPath string, measurements, measurementAlias []string) {
+	var (
+		dataTypes = []client.TSDataType{
+			client.FLOAT,
+			client.FLOAT,
+		}
+		encodings = []client.TSEncoding{
+			client.PLAIN,
+			client.PLAIN,
+		}
+		compressors = []client.TSCompressionType{
+			client.LZ4,
+			client.LZ4,
+		}
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
+	}
+
+}
+
+func createMultiTimeseries() {
+	var (
+		paths       = []string{"root.sg1.dev1.temperature"}
+		dataTypes   = []client.TSDataType{client.TEXT}
+		encodings   = []client.TSEncoding{client.PLAIN}
+		compressors = []client.TSCompressionType{client.SNAPPY}
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.CreateMultiTimeseries(paths, dataTypes, encodings, compressors))
+	}
+
+}
+
+func deleteTimeseries(paths ...string) {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.DeleteTimeseries(paths))
+	}
+
+}
+
+func insertStringRecord() {
+	var (
+		deviceId           = "root.ln.wf02.wt02"
+		measurements       = []string{"hardware"}
+		values             = []string{"123"}
+		timestamp    int64 = 12
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertStringRecord(deviceId, measurements, values, timestamp))
+	}
+
+}
+
+func insertRecord() {
+	var (
+		deviceId           = "root.sg1.dev1"
+		measurements       = []string{"status"}
+		values             = []interface{}{"123"}
+		dataTypes          = []client.TSDataType{client.TEXT}
+		timestamp    int64 = 12
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
+	}
+
+}
+
+func insertAlignedRecord() {
+	var (
+		deviceId           = "root.al1.dev1"
+		measurements       = []string{"status"}
+		values             = []interface{}{"123"}
+		dataTypes          = []client.TSDataType{client.TEXT}
+		timestamp    int64 = 12
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertAlignedRecord(deviceId, measurements, dataTypes, values, timestamp))
+		sessionDataSet, err := session.ExecuteStatement("show devices")
+		if err == nil {
+			printDataSet0(sessionDataSet)
+			sessionDataSet.Close()
+		} else {
+			log.Println(err)
+		}
+		fmt.Println()
+	}
+
+}
+
+func insertRecords() {
+	var (
+		deviceId     = []string{"root.sg1.dev1"}
+		measurements = [][]string{{"status"}}
+		dataTypes    = [][]client.TSDataType{{client.TEXT}}
+		values       = [][]interface{}{{"123"}}
+		timestamp    = []int64{12}
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
+	}
+
+}
+
+func insertAlignedRecords() {
+	var (
+		deviceIds    = []string{"root.al1.dev2", "root.al1.dev3"}
+		measurements = [][]string{{"status"}, {"temperature"}}
+		dataTypes    = [][]client.TSDataType{{client.TEXT}, {client.TEXT}}
+		values       = [][]interface{}{{"33"}, {"44"}}
+		timestamps   = []int64{12, 13}
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertAlignedRecords(deviceIds, measurements, dataTypes, values, timestamps))
+		sessionDataSet, err := session.ExecuteStatement("show devices")
+		if err == nil {
+			printDataSet0(sessionDataSet)
+			sessionDataSet.Close()
+		} else {
+			log.Println(err)
+		}
+		fmt.Println()
+	}
+
+}
+
+func insertRecordsOfOneDevice() {
+	ts := time.Now().UTC().UnixNano() / 1000000
+	var (
+		deviceId          = "root.sg1.dev0"
+		measurementsSlice = [][]string{
+			{"restart_count", "tick_count", "price"},
+			{"temperature", "description", "status"},
+		}
+		dataTypes = [][]client.TSDataType{
+			{client.INT32, client.INT64, client.DOUBLE},
+			{client.FLOAT, client.TEXT, client.BOOLEAN},
+		}
+		values = [][]interface{}{
+			{int32(1), int64(2018), float64(1988.1)},
+			{float32(12.1), "Test Device 1", false},
+		}
+		timestamps = []int64{ts, ts - 1}
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+	}
+
+}
+
+func insertAlignedRecordsOfOneDevice() {
+	ts := time.Now().UTC().UnixNano() / 1000000
+	var (
+		deviceId          = "root.al1.dev4"
+		measurementsSlice = [][]string{
+			{"restart_count", "tick_count", "price"},
+			{"temperature", "description", "status"},
+		}
+		dataTypes = [][]client.TSDataType{
+			{client.INT32, client.INT64, client.DOUBLE},
+			{client.FLOAT, client.TEXT, client.BOOLEAN},
+		}
+		values = [][]interface{}{
+			{int32(1), int64(2018), float64(1988.1)},
+			{float32(12.1), "Test Device 1", false},
+		}
+		timestamps = []int64{ts, ts - 1}
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertAlignedRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
+		sessionDataSet, err := session.ExecuteStatement("show devices")
+		if err == nil {
+			printDataSet0(sessionDataSet)
+			sessionDataSet.Close()
+		} else {
+			log.Println(err)
+		}
+		sessionDataSetNew, err := session.ExecuteStatement("select restart_count,tick_count,price,temperature,description,status from  root.al1.dev4")
+		if err == nil {
+			printDataSet0(sessionDataSetNew)
+			sessionDataSetNew.Close()
+		} else {
+			log.Println(err)
+		}
+		fmt.Println()
+	}
+
+}
+
+func deleteData() {
+	var (
+		paths           = []string{"root.sg1.dev1.status"}
+		startTime int64 = 0
+		endTime   int64 = 12
+	)
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.DeleteData(paths, startTime, endTime))
+	}
+
+}
+
+func insertTablet() {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		if tablet, err := createTablet(12); err == nil {
+			status, err := session.InsertTablet(tablet, false)
+			checkError(status, err)
+		} else {
+			log.Fatal(err)
+		}
+	}
+
+}
+
+func insertAlignedTablet() {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		if tablet, err := createTablet(12); err == nil {
+			status, err := session.InsertAlignedTablet(tablet, false)
+			checkError(status, err)
+		} else {
+			log.Fatal(err)
+		}
+		var timeout int64 = 1000
+		if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {
+			printDevice1(ds)
+			ds.Close()
+		} else {
+			log.Fatal(err)
+		}
+	}
+
+}
+
+func createTablet(rowCount int) (*client.Tablet, error) {
+	tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
+		{
+			Measurement: "restart_count",
+			DataType:    client.INT32,
+			Encoding:    client.RLE,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "price",
+			DataType:    client.DOUBLE,
+			Encoding:    client.GORILLA,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "tick_count",
+			DataType:    client.INT64,
+			Encoding:    client.RLE,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "temperature",
+			DataType:    client.FLOAT,
+			Encoding:    client.GORILLA,
+			Compressor:  client.SNAPPY,
+		}, {
+			Measurement: "description",
+			DataType:    client.TEXT,
+			Encoding:    client.PLAIN,
+			Compressor:  client.SNAPPY,
+		},
+		{
+			Measurement: "status",
+			DataType:    client.BOOLEAN,
+			Encoding:    client.RLE,
+			Compressor:  client.SNAPPY,
+		},
+	}, rowCount)
+
+	if err != nil {
+		return nil, err
+	}
+	ts := time.Now().UTC().UnixNano() / 1000000
+	for row := 0; row < int(rowCount); row++ {
+		ts++
+		tablet.SetTimestamp(ts, row)
+		tablet.SetValueAt(rand.Int31(), 0, row)
+		tablet.SetValueAt(rand.Float64(), 1, row)
+		tablet.SetValueAt(rand.Int63(), 2, row)
+		tablet.SetValueAt(rand.Float32(), 3, row)
+		tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
+		tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+	}
+	return tablet, nil
+}
+
+func insertTablets() {
+	tablet1, err := createTablet(8)
+	if err != nil {
+		log.Fatal(err)
+	}
+	tablet2, err := createTablet(4)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	tablets := []*client.Tablet{tablet1, tablet2}
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertTablets(tablets, false))
+	}
+
+}
+
+func insertAlignedTablets() {
+	tablet1, err := createTablet(8)
+	if err != nil {
+		log.Fatal(err)
+	}
+	tablet2, err := createTablet(4)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	tablets := []*client.Tablet{tablet1, tablet2}
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.InsertAlignedTablets(tablets, false))
+	}
+
+}
+
+func setTimeZone() {
+	var timeZone = "GMT"
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		session.SetTimeZone(timeZone)
+	}
+
+}
+
+func getTimeZone() (string, error) {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		return session.GetTimeZone()
+	}
+	return "", err
+}
+
+func executeQueryStatement(sql string) {
+	var timeout int64 = 1000
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err != nil {
+		log.Print(err)
+		return
+	}
+	sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
+	if err == nil {
+		defer sessionDataSet.Close()
+		printDataSet1(sessionDataSet)
+	} else {
+		log.Println(err)
+	}
+}
+
+func executeStatement() {
+	var sql = "show storage group"
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err != nil {
+		log.Print(err)
+		return
+	}
+	sessionDataSet, err := session.ExecuteStatement(sql)
+	if err == nil {
+		printDataSet0(sessionDataSet)
+		sessionDataSet.Close()
+	} else {
+		log.Println(err)
+	}
+}
+
+func executeRawDataQuery() {
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err != nil {
+		log.Print(err)
+		return
+	}
+	session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)")
+	var (
+		paths     []string = []string{"root.ln.wf02.wt02.s5"}
+		startTime int64    = 1
+		endTime   int64    = 200
+	)
+	sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
+	if err == nil {
+		printDataSet2(sessionDataSet)
+		sessionDataSet.Close()
+	} else {
+		log.Println(err)
+	}
+}
+
+func executeBatchStatement() {
+	var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
+		"insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err == nil {
+		checkError(session.ExecuteBatchStatement(sqls))
+	}
+
+}
+
+func printDevice1(sds *client.SessionDataSet) {
+	showTimestamp := !sds.IsIgnoreTimeStamp()
+	if showTimestamp {
+		fmt.Print("Time\t\t\t\t")
+	}
+
+	for _, columnName := range sds.GetColumnNames() {
+		fmt.Printf("%s\t", columnName)
+	}
+	fmt.Println()
+
+	for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+		if showTimestamp {
+			fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+		}
+
+		var restartCount int32
+		var price float64
+		var tickCount int64
+		var temperature float32
+		var description string
+		var status bool
+
+		// All of iotdb datatypes can be scan into string variables
+		// var restartCount string
+		// var price string
+		// var tickCount string
+		// var temperature string
+		// var description string
+		// var status string
+
+		if err := sds.Scan(&restartCount, &price, &tickCount, &temperature, &description, &status); err != nil {
+			log.Fatal(err)
+		}
+
+		whitespace := "\t\t"
+		fmt.Printf("%v%s", restartCount, whitespace)
+		fmt.Printf("%v%s", price, whitespace)
+		fmt.Printf("%v%s", tickCount, whitespace)
+		fmt.Printf("%v%s", temperature, whitespace)
+		fmt.Printf("%v%s", description, whitespace)
+		fmt.Printf("%v%s", status, whitespace)
+
+		fmt.Println()
+	}
+}
+
+func printDataSet0(sessionDataSet *client.SessionDataSet) {
+	showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
+	if showTimestamp {
+		fmt.Print("Time\t\t\t\t")
+	}
+
+	for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+		fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
+	}
+	fmt.Println()
+
+	for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() {
+		if showTimestamp {
+			fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName))
+		}
+		for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
+			columnName := sessionDataSet.GetColumnName(i)
+			switch sessionDataSet.GetColumnDataType(i) {
+			case client.BOOLEAN:
+				fmt.Print(sessionDataSet.GetBool(columnName))
+			case client.INT32:
+				fmt.Print(sessionDataSet.GetInt32(columnName))
+			case client.INT64:
+				fmt.Print(sessionDataSet.GetInt64(columnName))
+			case client.FLOAT:
+				fmt.Print(sessionDataSet.GetFloat(columnName))
+			case client.DOUBLE:
+				fmt.Print(sessionDataSet.GetDouble(columnName))
+			case client.TEXT:
+				fmt.Print(sessionDataSet.GetText(columnName))
+			default:
+			}
+			fmt.Print("\t\t")
+		}
+		fmt.Println()
+	}
+}
+
+func printDataSet1(sds *client.SessionDataSet) {
+	showTimestamp := !sds.IsIgnoreTimeStamp()
+	if showTimestamp {
+		fmt.Print("Time\t\t\t\t")
+	}
+
+	for i := 0; i < sds.GetColumnCount(); i++ {
+		fmt.Printf("%s\t", sds.GetColumnName(i))
+	}
+	fmt.Println()
+
+	for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+		if showTimestamp {
+			fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+		}
+		for i := 0; i < sds.GetColumnCount(); i++ {
+			columnName := sds.GetColumnName(i)
+			v := sds.GetValue(columnName)
+			if v == nil {
+				v = "null"
+			}
+			fmt.Printf("%v\t\t", v)
+		}
+		fmt.Println()
+	}
+}
+
+func printDataSet2(sds *client.SessionDataSet) {
+	showTimestamp := !sds.IsIgnoreTimeStamp()
+	if showTimestamp {
+		fmt.Print("Time\t\t\t\t")
+	}
+
+	for i := 0; i < sds.GetColumnCount(); i++ {
+		fmt.Printf("%s\t", sds.GetColumnName(i))
+	}
+	fmt.Println()
+
+	for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
+		if showTimestamp {
+			fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
+		}
+
+		if record, err := sds.GetRowRecord(); err == nil {
+			for _, field := range record.GetFields() {
+				v := field.GetValue()
+				if field.IsNull() {
+					v = "null"
+				}
+				fmt.Printf("%v\t\t", v)
+			}
+			fmt.Println()
+		}
+	}
+}
+
+func checkError(status *rpc.TSStatus, err error) {
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	if status != nil {
+		if err = client.VerifySuccess(status); err != nil {
+			log.Println(err)
+		}
+	}
+}
+
+// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
+func useSessionPool() {
+
+	config := &client.PoolConfig{
+		UserName: user,
+		Password: password,
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+	}
+	sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+	defer sessionPool.Close()
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err != nil {
+		log.Print(err)
+		return
+	}
+
+}