You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/03/09 07:26:52 UTC

[iotdb-client-go] branch rel/0.13 updated: session pool supports multiple nodes (#78)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new d859381  session pool supports multiple nodes (#78)
d859381 is described below

commit d8593815906100d22c7b333bcc7d86d451bcaf5f
Author: Liwen Fu <fu...@gmail.com>
AuthorDate: Thu Mar 9 15:26:47 2023 +0800

    session pool supports multiple nodes (#78)
    
    Co-authored-by: fuliwen <fu...@yonyou.com>
---
 README.md                                    | 13 ++++++++++
 README_ZH.md                                 | 15 ++++++++++++
 client/session.go                            | 36 +++++++++++++++-------------
 client/sessionpool.go                        | 31 +++++++++++++++++++-----
 example/session_pool/session_pool_example.go | 35 +++++++++++++++++----------
 5 files changed, 94 insertions(+), 36 deletions(-)

diff --git a/README.md b/README.md
index cc2a26e..831c60b 100644
--- a/README.md
+++ b/README.md
@@ -86,6 +86,7 @@ If there is no available connections and the pool reaches its max size, the all
 The PutBack method must be called after use
 
 ### New sessionPool
+standalone
 
 ```golang
 
@@ -97,6 +98,18 @@ config := &client.PoolConfig{
 }
 sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
 
+```
+cluster or doubleLive
+
+```golang
+
+config := &client.PoolConfig{
+		UserName: user,
+		Password: password,
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+	}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
 ```
 
 ### Get session through sessionPool, putback after use
diff --git a/README_ZH.md b/README_ZH.md
index 13d4ff8..3e88438 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -69,6 +69,7 @@ go run session_example.go
 
 ### 创建sessionPool
 
+单实例
 ```golang
 
 config := &client.PoolConfig{
@@ -81,6 +82,20 @@ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
 
 ```
 
+分布式或双活
+
+```golang
+
+config := &client.PoolConfig{
+		UserName: user,
+		Password: password,
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+	}
+sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+
+```
+
+
 ### 使用sessionPool获取session,使用完手动调用PutBack
 
 例1:设置存储组
diff --git a/client/session.go b/client/session.go
index c2a29e7..f26699a 100644
--- a/client/session.go
+++ b/client/session.go
@@ -118,11 +118,12 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
 }
 
 type ClusterConfig struct {
-	NodeUrls  []string //ip:port
-	UserName  string
-	Password  string
-	FetchSize int32
-	TimeZone  string
+	NodeUrls        []string //ip:port
+	UserName        string
+	Password        string
+	FetchSize       int32
+	TimeZone        string
+	ConnectRetryMax int
 }
 
 type ClusterSession struct {
@@ -975,12 +976,12 @@ func NewSession(config *Config) Session {
 	return Session{config: config}
 }
 
-func NewClusterSession(ClusterConfig *ClusterConfig) Session {
+func NewClusterSession(clusterConfig *ClusterConfig) Session {
 	session := Session{}
 	node := endPoint{}
-	for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
-		node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
-		node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
+	for i := 0; i < len(clusterConfig.NodeUrls); i++ {
+		node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
+		node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
 		endPointList.PushBack(node)
 	}
 	var err error
@@ -996,7 +997,7 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session {
 					log.Println(err)
 				} else {
 					session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
-						ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
+						clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
 					break
 				}
 			}
@@ -1052,14 +1053,15 @@ func (s *Session) initClusterConn(node endPoint) error {
 
 }
 
-func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
+func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config {
 	return &Config{
-		Host:      host,
-		Port:      port,
-		UserName:  userName,
-		Password:  passWord,
-		FetchSize: fetchSize,
-		TimeZone:  timeZone,
+		Host:            host,
+		Port:            port,
+		UserName:        userName,
+		Password:        passWord,
+		FetchSize:       fetchSize,
+		TimeZone:        timeZone,
+		ConnectRetryMax: connectRetryMax,
 	}
 }
 
diff --git a/client/sessionpool.go b/client/sessionpool.go
index dbcb7bb..156ce2a 100644
--- a/client/sessionpool.go
+++ b/client/sessionpool.go
@@ -78,7 +78,7 @@ func (spool *SessionPool) GetSession() (session Session, err error) {
 				if ok {
 					return session, nil
 				} else {
-					log.Println("sessionpool has closed")
+					log.Println("sessionPool has closed")
 					return session, errPoolClosed
 				}
 			default:
@@ -93,11 +93,19 @@ func (spool *SessionPool) GetSession() (session Session, err error) {
 	}
 }
 
-func (spool *SessionPool) ConstructSession(config *PoolConfig) (Session, error) {
-	session := NewSession(getSessionConfig(config))
-	if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
-		log.Print(err)
-		return session, err
+func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
+	if len(config.NodeUrls) > 0 {
+		session = NewClusterSession(getClusterSessionConfig(config))
+		if err := session.OpenCluster(spool.enableCompression); err != nil {
+			log.Print(err)
+			return session, err
+		}
+	} else {
+		session = NewSession(getSessionConfig(config))
+		if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
+			log.Print(err)
+			return session, err
+		}
 	}
 	return session, nil
 }
@@ -114,6 +122,17 @@ func getSessionConfig(config *PoolConfig) *Config {
 	}
 }
 
+func getClusterSessionConfig(config *PoolConfig) *ClusterConfig {
+	return &ClusterConfig{
+		NodeUrls:        config.NodeUrls,
+		UserName:        config.UserName,
+		Password:        config.Password,
+		FetchSize:       config.FetchSize,
+		TimeZone:        config.TimeZone,
+		ConnectRetryMax: config.ConnectRetryMax,
+	}
+}
+
 func (spool *SessionPool) PutBack(session Session) {
 	if session.trans.IsOpen() {
 		spool.ch <- session
diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go
index 26a7fb7..c43a4fb 100644
--- a/example/session_pool/session_pool_example.go
+++ b/example/session_pool/session_pool_example.go
@@ -24,6 +24,7 @@ import (
 	"fmt"
 	"log"
 	"math/rand"
+	"strings"
 	"sync"
 	"time"
 
@@ -60,8 +61,8 @@ func main() {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
-			setStorageGroup(fmt.Sprintf("root.ln%d", j))
-			deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
+			setStorageGroup(fmt.Sprintf("root.ln-%d", j))
+			deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))
 
 		}()
 
@@ -134,17 +135,6 @@ func main() {
 	insertAlignedTablets()
 	deleteTimeseries("root.ln.device1.*")
 	executeQueryStatement("show timeseries root.**")
-	for i := 0; i < 10000; i++ {
-		var j = i
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			setStorageGroup(fmt.Sprintf("root.ln%d", j))
-			deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
-
-		}()
-
-	}
 	wg.Wait()
 
 }
@@ -773,3 +763,22 @@ func checkError(status *rpc.TSStatus, err error) {
 		}
 	}
 }
+
+// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
+func useSessionPool() {
+
+	config := &client.PoolConfig{
+		UserName: user,
+		Password: password,
+		NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
+	}
+	sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
+	defer sessionPool.Close()
+	session, err := sessionPool.GetSession()
+	defer sessionPool.PutBack(session)
+	if err != nil {
+		log.Print(err)
+		return
+	}
+
+}