You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/12/09 03:22:09 UTC
[iotdb-client-go] branch main updated: Support connecting multiple nodes of the cluster (#25)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/main by this push:
new 41453fd Support connecting multiple nodes of the cluster (#25)
41453fd is described below
commit 41453fd1d9cfcc26abd8b403a58a5cbda8f6f9f0
Author: Hang Ji <55...@users.noreply.github.com>
AuthorDate: Thu Dec 9 11:22:03 2021 +0800
Support connecting multiple nodes of the cluster (#25)
---
client/session.go | 301 +++++++++++++++++++++++++++++++++++++++++++--
example/session_example.go | 16 ++-
test/e2e/e2e_test.go | 2 +-
3 files changed, 309 insertions(+), 10 deletions(-)
diff --git a/client/session.go b/client/session.go
index 9c89662..c5b4192 100644
--- a/client/session.go
+++ b/client/session.go
@@ -21,13 +21,16 @@ package client
import (
"bytes"
+ "container/list"
"context"
"encoding/binary"
"errors"
"fmt"
+ "log"
"net"
"reflect"
"sort"
+ "strings"
"time"
"github.com/apache/iotdb-client-go/rpc"
@@ -58,6 +61,14 @@ type Session struct {
requestStatementId int64
}
+type endPoint struct {
+ Host string
+ Port string
+}
+
+var endPointList = list.New()
+var session Session
+
func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error {
if s.config.FetchSize <= 0 {
s.config.FetchSize = DefaultFetchSize
@@ -70,7 +81,7 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
var err error
s.trans, err = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{
- ConnectTimeout: time.Duration(connectionTimeoutInMs), // Use 0 for no timeout
+ ConnectTimeout: time.Duration(connectionTimeoutInMs), // Use 0 for no timeout
})
if err != nil {
return err
@@ -107,6 +118,59 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
return err
}
+type ClusterConfig struct {
+ NodeUrls []string //ip:port
+ UserName string
+ Password string
+ FetchSize int32
+ TimeZone string
+}
+
+type ClusterSession struct {
+ config *ClusterConfig
+ client *rpc.TSIServiceClient
+ sessionId int64
+ trans thrift.TTransport
+ requestStatementId int64
+}
+
+func (s *Session) OpenCluster(enableRPCCompression bool) error {
+ if s.config.FetchSize <= 0 {
+ s.config.FetchSize = DefaultFetchSize
+ }
+ if s.config.TimeZone == "" {
+ s.config.TimeZone = DefaultTimeZone
+ }
+
+ var protocolFactory thrift.TProtocolFactory
+ var err error
+
+ if enableRPCCompression {
+ protocolFactory = thrift.NewTCompactProtocolFactory()
+ } else {
+ protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
+ }
+ iprot := protocolFactory.GetProtocol(s.trans)
+ oprot := protocolFactory.GetProtocol(s.trans)
+ s.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
+ req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: &s.config.UserName,
+ Password: &s.config.Password}
+ fmt.Println(req)
+ resp, err := s.client.OpenSession(context.Background(), &req)
+ if err != nil {
+ return err
+ }
+ s.sessionId = resp.GetSessionId()
+ s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
+ if err != nil {
+ return err
+ }
+
+ s.SetTimeZone(s.config.TimeZone)
+ s.config.TimeZone, err = s.GetTimeZone()
+ return err
+}
+
func (s *Session) Close() (r *rpc.TSStatus, err error) {
req := rpc.NewTSCloseSessionReq()
req.SessionId = s.sessionId
@@ -126,6 +190,11 @@ func (s *Session) Close() (r *rpc.TSStatus, err error) {
*/
func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) {
r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
+ }
+ }
return r, err
}
@@ -138,6 +207,11 @@ func (s *Session) SetStorageGroup(storageGroupId string) (r *rpc.TSStatus, err e
*/
func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, err error) {
r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
+ }
+ }
return r, err
}
@@ -150,6 +224,11 @@ func (s *Session) DeleteStorageGroup(storageGroupId string) (r *rpc.TSStatus, er
*/
func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *rpc.TSStatus, err error) {
r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
+ }
+ }
return r, err
}
@@ -167,6 +246,11 @@ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TS
request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding),
Compressor: int32(compressor), Attributes: attributes, Tags: tags}
status, err := s.client.CreateTimeseries(context.Background(), &request)
+ if err != nil && status == nil {
+ if reconnect() {
+ status, err = session.client.CreateTimeseries(context.Background(), &request)
+ }
+ }
return status, err
}
@@ -200,6 +284,12 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType,
Encodings: destEncodings, Compressors: destCompressions}
r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.CreateMultiTimeseries(context.Background(), &request)
+ }
+ }
+
return r, err
}
@@ -212,6 +302,11 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType,
*/
func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err error) {
r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
+ }
+ }
return r, err
}
@@ -227,6 +322,11 @@ func (s *Session) DeleteTimeseries(paths []string) (r *rpc.TSStatus, err error)
func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *rpc.TSStatus, err error) {
request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime}
r, err = s.client.DeleteData(context.Background(), &request)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.DeleteData(context.Background(), &request)
+ }
+ }
return r, err
}
@@ -244,6 +344,11 @@ func (s *Session) InsertStringRecord(deviceId string, measurements []string, val
request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, DeviceId: deviceId, Measurements: measurements,
Values: values, Timestamp: timestamp}
r, err = s.client.InsertStringRecord(context.Background(), &request)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.InsertStringRecord(context.Background(), &request)
+ }
+ }
return r, err
}
@@ -270,6 +375,13 @@ func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
FetchSize: &s.config.FetchSize,
}
resp, err := s.client.ExecuteStatement(context.Background(), &request)
+
+ if err != nil && resp == nil {
+ if reconnect() {
+ resp, err = session.client.ExecuteStatement(context.Background(), &request)
+ }
+ }
+
return s.genDataSet(sql, resp), err
}
@@ -283,6 +395,14 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD
return nil, statusErr
}
} else {
+ if reconnect() {
+ resp, err = session.client.ExecuteQueryStatement(context.Background(), &request)
+ if statusErr := VerifySuccess(resp.Status); statusErr == nil {
+ return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ } else {
+ return nil, statusErr
+ }
+ }
return nil, err
}
}
@@ -311,6 +431,13 @@ func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes
return nil, err
}
r, err = s.client.InsertRecord(context.Background(), request)
+
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.InsertRecord(context.Background(), request)
+ }
+ }
+
return r, err
}
@@ -369,7 +496,16 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64,
MeasurementsList: measurementsSlice,
ValuesList: valuesList,
}
- return s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+ r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
+
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.InsertRecordsOfOneDevice(context.Background(), request)
+ }
+ }
+
+ return r, err
}
/*
@@ -390,7 +526,13 @@ func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dat
if err != nil {
return nil, err
} else {
- return s.client.InsertRecords(context.Background(), request)
+ r, err = s.client.InsertRecords(context.Background(), request)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.InsertRecords(context.Background(), request)
+ }
+ }
+ return r, err
}
}
@@ -411,7 +553,13 @@ func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *rpc.TSStatus
if err != nil {
return nil, err
}
- return s.client.InsertTablets(context.Background(), request)
+ r, err = s.client.InsertTablets(context.Background(), request)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.InsertTablets(context.Background(), request)
+ }
+ }
+ return r, err
}
func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, err error) {
@@ -419,7 +567,13 @@ func (s *Session) ExecuteBatchStatement(inserts []string) (r *rpc.TSStatus, err
SessionId: s.sessionId,
Statements: inserts,
}
- return s.client.ExecuteBatchStatement(context.Background(), &request)
+ r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.ExecuteBatchStatement(context.Background(), &request)
+ }
+ }
+ return r, err
}
func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) {
@@ -432,6 +586,13 @@ func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime i
StatementId: s.requestStatementId,
}
resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request)
+
+ if err != nil && resp == nil {
+ if reconnect() {
+ resp, err = session.client.ExecuteRawDataQuery(context.Background(), &request)
+ }
+ }
+
return s.genDataSet("", resp), err
}
@@ -443,6 +604,13 @@ func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
FetchSize: &s.config.FetchSize,
}
resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request)
+
+ if err != nil && resp == nil {
+ if reconnect() {
+ resp, err = session.client.ExecuteUpdateStatement(context.Background(), &request)
+ }
+ }
+
return s.genDataSet(sql, resp), err
}
@@ -581,7 +749,16 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *rpc.TSStatus, er
if err != nil {
return nil, err
}
- return s.client.InsertTablet(context.Background(), request)
+
+ r, err = s.client.InsertTablet(context.Background(), request)
+
+ if err != nil && r == nil {
+ if reconnect() {
+ r, err = session.client.InsertTablet(context.Background(), request)
+ }
+ }
+
+ return r, err
}
func (s *Session) genTSInsertTabletReq(tablet *Tablet) (*rpc.TSInsertTabletReq, error) {
@@ -605,6 +782,114 @@ func (s *Session) GetSessionId() int64 {
return s.sessionId
}
-func NewSession(config *Config) *Session {
- return &Session{config: config}
+func NewSession(config *Config) Session {
+ endPoint := endPoint{}
+ endPoint.Host = config.Host
+ endPoint.Port = config.Port
+ endPointList.PushBack(endPoint)
+ return Session{config: config}
+}
+
+func NewClusterSession(ClusterConfig *ClusterConfig) Session {
+
+ node := endPoint{}
+ for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
+ node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
+ node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
+ endPointList.PushBack(node)
+ }
+ var err error
+ for e := endPointList.Front(); e != nil; e = e.Next() {
+ session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{
+ ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+ })
+ if err == nil {
+ session.trans = thrift.NewTFramedTransport(session.trans)
+ if !session.trans.IsOpen() {
+ err = session.trans.Open()
+ if err != nil {
+ log.Println(err)
+ } else {
+ session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
+ ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
+ break
+ }
+ }
+ }
+ }
+ if err != nil {
+ log.Fatal("No Server Can Connect")
+ }
+ return session
+}
+
+func initClusterConn(node endPoint) error {
+ var err error
+
+ session.trans, err = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{
+ ConnectTimeout: time.Duration(0), // Use 0 for no timeout
+ })
+ if err == nil {
+ session.trans = thrift.NewTFramedTransport(session.trans)
+ if !session.trans.IsOpen() {
+ err = session.trans.Open()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ var protocolFactory thrift.TProtocolFactory
+ protocolFactory = thrift.NewTBinaryProtocolFactoryDefault()
+ iprot := protocolFactory.GetProtocol(session.trans)
+ oprot := protocolFactory.GetProtocol(session.trans)
+ session.client = rpc.NewTSIServiceClient(thrift.NewTStandardClient(iprot, oprot))
+ req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: session.config.TimeZone, Username: &session.config.UserName,
+ Password: &session.config.Password}
+ fmt.Println(req)
+ resp, err := session.client.OpenSession(context.Background(), &req)
+ if err != nil {
+ return err
+ }
+ session.sessionId = resp.GetSessionId()
+ session.requestStatementId, err = session.client.RequestStatementId(context.Background(), session.sessionId)
+ if err != nil {
+ return err
+ }
+
+ session.SetTimeZone(session.config.TimeZone)
+ session.config.TimeZone, err = session.GetTimeZone()
+ return err
+
+}
+
+func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
+ return &Config{
+ Host: host,
+ Port: port,
+ UserName: userName,
+ Password: passWord,
+ FetchSize: fetchSize,
+ TimeZone: timeZone,
+ }
+}
+
+func reconnect() bool {
+ var err error
+ var connectedSuccess = false
+
+ for i := 0; i < 3; i++ {
+ for e := endPointList.Front(); e != nil; e = e.Next() {
+ err = initClusterConn(e.Value.(endPoint))
+ if err == nil {
+ connectedSuccess = true
+ break
+ } else {
+ log.Println("Connection refused:", e.Value.(endPoint))
+ }
+ }
+ if connectedSuccess {
+ break
+ }
+ }
+ return connectedSuccess
}
diff --git a/example/session_example.go b/example/session_example.go
index 234d8d3..c10154b 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -24,6 +24,7 @@ import (
"fmt"
"log"
"math/rand"
+ "strings"
"time"
"github.com/apache/iotdb-client-go/client"
@@ -36,7 +37,7 @@ var (
user string
password string
)
-var session *client.Session
+var session client.Session
func main() {
flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
@@ -489,3 +490,16 @@ func checkError(status *rpc.TSStatus, err error) {
}
}
}
+
+// If your IotDB is a cluster version, you can use the following code for multi node connection
+func connectCluster() {
+ config := &client.ClusterConfig{
+ NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669", ","),
+ UserName: "root",
+ Password: "root",
+ }
+ session = client.NewClusterSession(config)
+ if err := session.OpenCluster(false); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 163bb34..645ac60 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -30,7 +30,7 @@ import (
type e2eTestSuite struct {
suite.Suite
- session *client.Session
+ session client.Session
}
func TestE2ETestSuite(t *testing.T) {