You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/06/22 10:02:14 UTC
[iotdb-client-go] branch main updated: Synchronize with master's
rpc.thrift (#19)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/main by this push:
new 81d6f54 Synchronize with master's rpc.thrift (#19)
81d6f54 is described below
commit 81d6f5493e17cec47cc95164b147372ba373c365
Author: Mark Liu <ma...@gmail.com>
AuthorDate: Tue Jun 22 18:01:24 2021 +0800
Synchronize with master's rpc.thrift (#19)
---
client/rpcdataset.go | 4 +-
client/rpcdataset_test.go | 2 +-
client/session.go | 6 +-
client/sessiondataset.go | 2 +-
example/session_example.go | 6 +-
rpc/rpc.go | 164 +++++++++++++++++++++++++++++++++++++--------
test/e2e/e2e_test.go | 4 +-
7 files changed, 150 insertions(+), 38 deletions(-)
diff --git a/client/rpcdataset.go b/client/rpcdataset.go
index 6601dca..631afaa 100644
--- a/client/rpcdataset.go
+++ b/client/rpcdataset.go
@@ -67,7 +67,7 @@ type IoTDBRpcDataSet struct {
emptyResultSet bool
ignoreTimeStamp bool
closed bool
- timeoutMs int64
+ timeoutMs *int64
}
func (s *IoTDBRpcDataSet) getColumnIndex(columnName string) int32 {
@@ -510,7 +510,7 @@ func (s *IoTDBRpcDataSet) Close() (err error) {
func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypes []string,
columnNameIndex map[string]int32,
queryId int64, client *rpc.TSIServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet,
- ignoreTimeStamp bool, fetchSize int32, timeoutMs int64) *IoTDBRpcDataSet {
+ ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64) *IoTDBRpcDataSet {
ds := &IoTDBRpcDataSet{
sql: sql,
diff --git a/client/rpcdataset_test.go b/client/rpcdataset_test.go
index a8a493d..43cd0ba 100644
--- a/client/rpcdataset_test.go
+++ b/client/rpcdataset_test.go
@@ -46,7 +46,7 @@ func createIoTDBRpcDataSet() *IoTDBRpcDataSet {
ValueList: [][]byte{{0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, {64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 20 [...]
BitmapList: [][]byte{{248}, {248}, {248}, {248}, {248}, {248}},
}
- return NewIoTDBRpcDataSet("select * from root.ln.device1", columns, dataTypes, columnNameIndex, queyrId, client, sessionId, &queryDataSet, false, DefaultFetchSize, 0)
+ return NewIoTDBRpcDataSet("select * from root.ln.device1", columns, dataTypes, columnNameIndex, queyrId, client, sessionId, &queryDataSet, false, DefaultFetchSize, nil)
}
func TestIoTDBRpcDataSet_getColumnType(t *testing.T) {
diff --git a/client/session.go b/client/session.go
index 8b99965..e345f57 100644
--- a/client/session.go
+++ b/client/session.go
@@ -270,9 +270,9 @@ func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
return s.genDataSet(sql, resp), err
}
-func (s *Session) ExecuteQueryStatement(sql string, timeoutMs int64) (*SessionDataSet, error) {
+func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) {
request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId,
- FetchSize: &s.config.FetchSize, Timeout: &timeoutMs}
+ FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
if resp, err := s.client.ExecuteQueryStatement(context.Background(), &request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
@@ -444,7 +444,7 @@ func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
}
func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *SessionDataSet {
- return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, 0)
+ return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
}
func (s *Session) genInsertTabletsReq(tablets []*Tablet) (*rpc.TSInsertTabletsReq, error) {
diff --git a/client/sessiondataset.go b/client/sessiondataset.go
index c0237fb..2651630 100644
--- a/client/sessiondataset.go
+++ b/client/sessiondataset.go
@@ -111,7 +111,7 @@ func (s *SessionDataSet) Close() error {
func NewSessionDataSet(sql string, columnNameList []string, columnTypeList []string,
columnNameIndex map[string]int32,
queryId int64, client *rpc.TSIServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet,
- ignoreTimeStamp bool, fetchSize int32, timeoutMs int64) *SessionDataSet {
+ ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64) *SessionDataSet {
return &SessionDataSet{
ioTDBRpcDataSet: NewIoTDBRpcDataSet(sql, columnNameList, columnTypeList,
diff --git a/example/session_example.go b/example/session_example.go
index 0def366..234d8d3 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -79,7 +79,8 @@ func main() {
deleteTimeseries("root.sg1.dev1.status")
insertTablet()
- if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", 1000); err == nil {
+ var timeout int64 = 1000
+ if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {
printDevice1(ds)
ds.Close()
} else {
@@ -445,7 +446,8 @@ func executeStatement() {
}
func executeQueryStatement(sql string) {
- sessionDataSet, err := session.ExecuteQueryStatement(sql, 1000)
+ var timeout int64 = 1000
+ sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
printDataSet1(sessionDataSet)
sessionDataSet.Close()
diff --git a/rpc/rpc.go b/rpc/rpc.go
index 73c0b60..3c9fbca 100644
--- a/rpc/rpc.go
+++ b/rpc/rpc.go
@@ -2273,12 +2273,14 @@ func (p *TSCloseSessionReq) String() string {
// - StatementId
// - FetchSize
// - Timeout
+// - EnableRedirectQuery
type TSExecuteStatementReq struct {
- SessionId int64 `thrift:"sessionId,1,required" db:"sessionId" json:"sessionId"`
- Statement string `thrift:"statement,2,required" db:"statement" json:"statement"`
- StatementId int64 `thrift:"statementId,3,required" db:"statementId" json:"statementId"`
- FetchSize *int32 `thrift:"fetchSize,4" db:"fetchSize" json:"fetchSize,omitempty"`
- Timeout *int64 `thrift:"timeout,5" db:"timeout" json:"timeout,omitempty"`
+ SessionId int64 `thrift:"sessionId,1,required" db:"sessionId" json:"sessionId"`
+ Statement string `thrift:"statement,2,required" db:"statement" json:"statement"`
+ StatementId int64 `thrift:"statementId,3,required" db:"statementId" json:"statementId"`
+ FetchSize *int32 `thrift:"fetchSize,4" db:"fetchSize" json:"fetchSize,omitempty"`
+ Timeout *int64 `thrift:"timeout,5" db:"timeout" json:"timeout,omitempty"`
+ EnableRedirectQuery *bool `thrift:"enableRedirectQuery,6" db:"enableRedirectQuery" json:"enableRedirectQuery,omitempty"`
}
func NewTSExecuteStatementReq() *TSExecuteStatementReq {
@@ -2314,6 +2316,15 @@ func (p *TSExecuteStatementReq) GetTimeout() int64 {
}
return *p.Timeout
}
+
+var TSExecuteStatementReq_EnableRedirectQuery_DEFAULT bool
+
+func (p *TSExecuteStatementReq) GetEnableRedirectQuery() bool {
+ if !p.IsSetEnableRedirectQuery() {
+ return TSExecuteStatementReq_EnableRedirectQuery_DEFAULT
+ }
+ return *p.EnableRedirectQuery
+}
func (p *TSExecuteStatementReq) IsSetFetchSize() bool {
return p.FetchSize != nil
}
@@ -2322,6 +2333,10 @@ func (p *TSExecuteStatementReq) IsSetTimeout() bool {
return p.Timeout != nil
}
+func (p *TSExecuteStatementReq) IsSetEnableRedirectQuery() bool {
+ return p.EnableRedirectQuery != nil
+}
+
func (p *TSExecuteStatementReq) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@@ -2393,6 +2408,16 @@ func (p *TSExecuteStatementReq) Read(iprot thrift.TProtocol) error {
return err
}
}
+ case 6:
+ if fieldTypeId == thrift.BOOL {
+ if err := p.ReadField6(iprot); err != nil {
+ return err
+ }
+ } else {
+ if err := iprot.Skip(fieldTypeId); err != nil {
+ return err
+ }
+ }
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
@@ -2462,6 +2487,15 @@ func (p *TSExecuteStatementReq) ReadField5(iprot thrift.TProtocol) error {
return nil
}
+func (p *TSExecuteStatementReq) ReadField6(iprot thrift.TProtocol) error {
+ if v, err := iprot.ReadBool(); err != nil {
+ return thrift.PrependError("error reading field 6: ", err)
+ } else {
+ p.EnableRedirectQuery = &v
+ }
+ return nil
+}
+
func (p *TSExecuteStatementReq) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("TSExecuteStatementReq"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
@@ -2482,6 +2516,9 @@ func (p *TSExecuteStatementReq) Write(oprot thrift.TProtocol) error {
if err := p.writeField5(oprot); err != nil {
return err
}
+ if err := p.writeField6(oprot); err != nil {
+ return err
+ }
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
@@ -2561,6 +2598,21 @@ func (p *TSExecuteStatementReq) writeField5(oprot thrift.TProtocol) (err error)
return err
}
+func (p *TSExecuteStatementReq) writeField6(oprot thrift.TProtocol) (err error) {
+ if p.IsSetEnableRedirectQuery() {
+ if err := oprot.WriteFieldBegin("enableRedirectQuery", thrift.BOOL, 6); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:enableRedirectQuery: ", p), err)
+ }
+ if err := oprot.WriteBool(bool(*p.EnableRedirectQuery)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.enableRedirectQuery (6) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 6:enableRedirectQuery: ", p), err)
+ }
+ }
+ return err
+}
+
func (p *TSExecuteStatementReq) String() string {
if p == nil {
return "<nil>"
@@ -3264,7 +3316,7 @@ type TSFetchResultsReq struct {
FetchSize int32 `thrift:"fetchSize,3,required" db:"fetchSize" json:"fetchSize"`
QueryId int64 `thrift:"queryId,4,required" db:"queryId" json:"queryId"`
IsAlign bool `thrift:"isAlign,5,required" db:"isAlign" json:"isAlign"`
- Timeout int64 `thrift:"timeout,6,required" db:"timeout" json:"timeout"`
+ Timeout *int64 `thrift:"timeout,6" db:"timeout" json:"timeout,omitempty"`
}
func NewTSFetchResultsReq() *TSFetchResultsReq {
@@ -3291,9 +3343,18 @@ func (p *TSFetchResultsReq) GetIsAlign() bool {
return p.IsAlign
}
+var TSFetchResultsReq_Timeout_DEFAULT int64
+
func (p *TSFetchResultsReq) GetTimeout() int64 {
- return p.Timeout
+ if !p.IsSetTimeout() {
+ return TSFetchResultsReq_Timeout_DEFAULT
+ }
+ return *p.Timeout
}
+func (p *TSFetchResultsReq) IsSetTimeout() bool {
+ return p.Timeout != nil
+}
+
func (p *TSFetchResultsReq) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@@ -3304,7 +3365,6 @@ func (p *TSFetchResultsReq) Read(iprot thrift.TProtocol) error {
var issetFetchSize bool = false
var issetQueryId bool = false
var issetIsAlign bool = false
- var issetTimeout bool = false
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
@@ -3375,7 +3435,6 @@ func (p *TSFetchResultsReq) Read(iprot thrift.TProtocol) error {
if err := p.ReadField6(iprot); err != nil {
return err
}
- issetTimeout = true
} else {
if err := iprot.Skip(fieldTypeId); err != nil {
return err
@@ -3408,9 +3467,6 @@ func (p *TSFetchResultsReq) Read(iprot thrift.TProtocol) error {
if !issetIsAlign {
return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field IsAlign is not set"))
}
- if !issetTimeout {
- return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field Timeout is not set"))
- }
return nil
}
@@ -3463,7 +3519,7 @@ func (p *TSFetchResultsReq) ReadField6(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 6: ", err)
} else {
- p.Timeout = v
+ p.Timeout = &v
}
return nil
}
@@ -3567,14 +3623,16 @@ func (p *TSFetchResultsReq) writeField5(oprot thrift.TProtocol) (err error) {
}
func (p *TSFetchResultsReq) writeField6(oprot thrift.TProtocol) (err error) {
- if err := oprot.WriteFieldBegin("timeout", thrift.I64, 6); err != nil {
- return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:timeout: ", p), err)
- }
- if err := oprot.WriteI64(int64(p.Timeout)); err != nil {
- return thrift.PrependError(fmt.Sprintf("%T.timeout (6) field write error: ", p), err)
- }
- if err := oprot.WriteFieldEnd(); err != nil {
- return thrift.PrependError(fmt.Sprintf("%T write field end error 6:timeout: ", p), err)
+ if p.IsSetTimeout() {
+ if err := oprot.WriteFieldBegin("timeout", thrift.I64, 6); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:timeout: ", p), err)
+ }
+ if err := oprot.WriteI64(int64(*p.Timeout)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.timeout (6) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 6:timeout: ", p), err)
+ }
}
return err
}
@@ -8331,13 +8389,15 @@ func (p *TSCreateTimeseriesReq) String() string {
// - StartTime
// - EndTime
// - StatementId
+// - EnableRedirectQuery
type TSRawDataQueryReq struct {
- SessionId int64 `thrift:"sessionId,1,required" db:"sessionId" json:"sessionId"`
- Paths []string `thrift:"paths,2,required" db:"paths" json:"paths"`
- FetchSize *int32 `thrift:"fetchSize,3" db:"fetchSize" json:"fetchSize,omitempty"`
- StartTime int64 `thrift:"startTime,4,required" db:"startTime" json:"startTime"`
- EndTime int64 `thrift:"endTime,5,required" db:"endTime" json:"endTime"`
- StatementId int64 `thrift:"statementId,6,required" db:"statementId" json:"statementId"`
+ SessionId int64 `thrift:"sessionId,1,required" db:"sessionId" json:"sessionId"`
+ Paths []string `thrift:"paths,2,required" db:"paths" json:"paths"`
+ FetchSize *int32 `thrift:"fetchSize,3" db:"fetchSize" json:"fetchSize,omitempty"`
+ StartTime int64 `thrift:"startTime,4,required" db:"startTime" json:"startTime"`
+ EndTime int64 `thrift:"endTime,5,required" db:"endTime" json:"endTime"`
+ StatementId int64 `thrift:"statementId,6,required" db:"statementId" json:"statementId"`
+ EnableRedirectQuery *bool `thrift:"enableRedirectQuery,7" db:"enableRedirectQuery" json:"enableRedirectQuery,omitempty"`
}
func NewTSRawDataQueryReq() *TSRawDataQueryReq {
@@ -8372,10 +8432,23 @@ func (p *TSRawDataQueryReq) GetEndTime() int64 {
func (p *TSRawDataQueryReq) GetStatementId() int64 {
return p.StatementId
}
+
+var TSRawDataQueryReq_EnableRedirectQuery_DEFAULT bool
+
+func (p *TSRawDataQueryReq) GetEnableRedirectQuery() bool {
+ if !p.IsSetEnableRedirectQuery() {
+ return TSRawDataQueryReq_EnableRedirectQuery_DEFAULT
+ }
+ return *p.EnableRedirectQuery
+}
func (p *TSRawDataQueryReq) IsSetFetchSize() bool {
return p.FetchSize != nil
}
+func (p *TSRawDataQueryReq) IsSetEnableRedirectQuery() bool {
+ return p.EnableRedirectQuery != nil
+}
+
func (p *TSRawDataQueryReq) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@@ -8461,6 +8534,16 @@ func (p *TSRawDataQueryReq) Read(iprot thrift.TProtocol) error {
return err
}
}
+ case 7:
+ if fieldTypeId == thrift.BOOL {
+ if err := p.ReadField7(iprot); err != nil {
+ return err
+ }
+ } else {
+ if err := iprot.Skip(fieldTypeId); err != nil {
+ return err
+ }
+ }
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
@@ -8558,6 +8641,15 @@ func (p *TSRawDataQueryReq) ReadField6(iprot thrift.TProtocol) error {
return nil
}
+func (p *TSRawDataQueryReq) ReadField7(iprot thrift.TProtocol) error {
+ if v, err := iprot.ReadBool(); err != nil {
+ return thrift.PrependError("error reading field 7: ", err)
+ } else {
+ p.EnableRedirectQuery = &v
+ }
+ return nil
+}
+
func (p *TSRawDataQueryReq) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("TSRawDataQueryReq"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
@@ -8581,6 +8673,9 @@ func (p *TSRawDataQueryReq) Write(oprot thrift.TProtocol) error {
if err := p.writeField6(oprot); err != nil {
return err
}
+ if err := p.writeField7(oprot); err != nil {
+ return err
+ }
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
@@ -8679,6 +8774,21 @@ func (p *TSRawDataQueryReq) writeField6(oprot thrift.TProtocol) (err error) {
return err
}
+func (p *TSRawDataQueryReq) writeField7(oprot thrift.TProtocol) (err error) {
+ if p.IsSetEnableRedirectQuery() {
+ if err := oprot.WriteFieldBegin("enableRedirectQuery", thrift.BOOL, 7); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 7:enableRedirectQuery: ", p), err)
+ }
+ if err := oprot.WriteBool(bool(*p.EnableRedirectQuery)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.enableRedirectQuery (7) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 7:enableRedirectQuery: ", p), err)
+ }
+ }
+ return err
+}
+
func (p *TSRawDataQueryReq) String() string {
if p == nil {
return "<nil>"
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 753407d..163bb34 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -79,7 +79,7 @@ func (s *e2eTestSuite) Test_CreateTimeseries() {
compressor = client.SNAPPY
)
s.checkError(s.session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
- ds, err := s.session.ExecuteQueryStatement("show timeseries root.tsg1.dev1.status", 1000)
+ ds, err := s.session.ExecuteQueryStatement("show timeseries root.tsg1.dev1.status", nil)
assert := s.Require()
@@ -101,7 +101,7 @@ func (s *e2eTestSuite) Test_InsertRecords() {
)
s.checkError(s.session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
- ds, err := s.session.ExecuteQueryStatement("select status from root.tsg1.dev1", 1000)
+ ds, err := s.session.ExecuteQueryStatement("select status from root.tsg1.dev1", nil)
assert := s.Require()
assert.NoError(err)
defer ds.Close()