You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/05/24 07:39:40 UTC

[iotdb-client-go] branch optimize_tablet created (now 77eb1d6)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch optimize_tablet
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git


      at 77eb1d6  optimize the implement of tablet

This branch includes the following new commits:

     new 77eb1d6  optimize the implement of tablet

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb-client-go] 01/01: optimize the implement of tablet

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch optimize_tablet
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git

commit 77eb1d671bf0c2172db34669fcc43dfeaeb6c9b9
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed May 24 15:39:28 2023 +0800

    optimize the implement of tablet
---
 client/session.go                            |  4 +--
 client/tablet.go                             | 48 +++++++++++++-------------
 client/tablet_test.go                        | 50 +---------------------------
 example/session_example.go                   | 15 ++-------
 example/session_pool/session_pool_example.go | 23 ++++++-------
 test/e2e/e2e_test.go                         | 16 +++------
 6 files changed, 46 insertions(+), 110 deletions(-)

diff --git a/client/session.go b/client/session.go
index 1851d91..828f7ea 100644
--- a/client/session.go
+++ b/client/session.go
@@ -821,7 +821,7 @@ func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.T
 		valuesList[index] = values
 		timestampsList[index] = tablet.GetTimestampBytes()
 		typesList[index] = tablet.getDataTypes()
-		sizeList[index] = int32(tablet.rowCount)
+		sizeList[index] = int32(tablet.maxRowNumber)
 	}
 	request := rpc.TSInsertTabletsReq{
 		SessionId:        s.sessionId,
@@ -977,7 +977,7 @@ func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool) (*rpc.TSI
 			Values:       values,
 			Timestamps:   tablet.GetTimestampBytes(),
 			Types:        tablet.getDataTypes(),
-			Size:         int32(tablet.rowCount),
+			Size:         int32(tablet.maxRowNumber),
 			IsAligned:    &isAligned,
 		}
 		return request, nil
diff --git a/client/tablet.go b/client/tablet.go
index 4a89835..e32dc61 100644
--- a/client/tablet.go
+++ b/client/tablet.go
@@ -31,9 +31,6 @@ import (
 type MeasurementSchema struct {
 	Measurement string
 	DataType    TSDataType
-	Encoding    TSEncoding
-	Compressor  TSCompressionType
-	Properties  map[string]string
 }
 
 type Tablet struct {
@@ -41,11 +38,12 @@ type Tablet struct {
 	measurementSchemas []*MeasurementSchema
 	timestamps         []int64
 	values             []interface{}
-	rowCount           int
+	maxRowNumber       int
+	RowSize            int
 }
 
 func (t *Tablet) Len() int {
-	return t.GetRowCount()
+	return t.RowSize
 }
 
 func (t *Tablet) Swap(i, j int) {
@@ -91,7 +89,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error
 		return fmt.Errorf("illegal argument columnIndex %d", columnIndex)
 	}
 
-	if rowIndex < 0 || rowIndex > int(t.rowCount) {
+	if rowIndex < 0 || rowIndex > int(t.maxRowNumber) {
 		return fmt.Errorf("illegal argument rowIndex %d", rowIndex)
 	}
 
@@ -161,7 +159,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error
 }
 
 func (t *Tablet) GetRowCount() int {
-	return t.rowCount
+	return t.maxRowNumber
 }
 
 func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) {
@@ -169,7 +167,7 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) {
 		return nil, fmt.Errorf("illegal argument columnIndex %d", columnIndex)
 	}
 
-	if rowIndex < 0 || rowIndex > int(t.rowCount) {
+	if rowIndex < 0 || rowIndex > int(t.maxRowNumber) {
 		return nil, fmt.Errorf("illegal argument rowIndex %d", rowIndex)
 	}
 
@@ -221,17 +219,17 @@ func (t *Tablet) getValuesBytes() ([]byte, error) {
 	for i, schema := range t.measurementSchemas {
 		switch schema.DataType {
 		case BOOLEAN:
-			binary.Write(buff, binary.BigEndian, t.values[i].([]bool))
+			binary.Write(buff, binary.BigEndian, t.values[i].([]bool)[0:t.RowSize])
 		case INT32:
-			binary.Write(buff, binary.BigEndian, t.values[i].([]int32))
+			binary.Write(buff, binary.BigEndian, t.values[i].([]int32)[0:t.RowSize])
 		case INT64:
-			binary.Write(buff, binary.BigEndian, t.values[i].([]int64))
+			binary.Write(buff, binary.BigEndian, t.values[i].([]int64)[0:t.RowSize])
 		case FLOAT:
-			binary.Write(buff, binary.BigEndian, t.values[i].([]float32))
+			binary.Write(buff, binary.BigEndian, t.values[i].([]float32)[0:t.RowSize])
 		case DOUBLE:
-			binary.Write(buff, binary.BigEndian, t.values[i].([]float64))
+			binary.Write(buff, binary.BigEndian, t.values[i].([]float64)[0:t.RowSize])
 		case TEXT:
-			for _, s := range t.values[i].([]string) {
+			for _, s := range t.values[i].([]string)[0:t.RowSize] {
 				binary.Write(buff, binary.BigEndian, int32(len(s)))
 				binary.Write(buff, binary.BigEndian, []byte(s))
 			}
@@ -247,28 +245,32 @@ func (t *Tablet) Sort() error {
 	return nil
 }
 
-func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, rowCount int) (*Tablet, error) {
+func (t *Tablet) Reset() {
+	t.RowSize = 0
+}
+
+func NewTablet(deviceId string, measurementSchemas []*MeasurementSchema, maxRowNumber int) (*Tablet, error) {
 	tablet := &Tablet{
 		deviceId:           deviceId,
 		measurementSchemas: measurementSchemas,
-		rowCount:           rowCount,
+		maxRowNumber:       maxRowNumber,
 	}
-	tablet.timestamps = make([]int64, rowCount)
+	tablet.timestamps = make([]int64, maxRowNumber)
 	tablet.values = make([]interface{}, len(measurementSchemas))
 	for i, schema := range tablet.measurementSchemas {
 		switch schema.DataType {
 		case BOOLEAN:
-			tablet.values[i] = make([]bool, rowCount)
+			tablet.values[i] = make([]bool, maxRowNumber)
 		case INT32:
-			tablet.values[i] = make([]int32, rowCount)
+			tablet.values[i] = make([]int32, maxRowNumber)
 		case INT64:
-			tablet.values[i] = make([]int64, rowCount)
+			tablet.values[i] = make([]int64, maxRowNumber)
 		case FLOAT:
-			tablet.values[i] = make([]float32, rowCount)
+			tablet.values[i] = make([]float32, maxRowNumber)
 		case DOUBLE:
-			tablet.values[i] = make([]float64, rowCount)
+			tablet.values[i] = make([]float64, maxRowNumber)
 		case TEXT:
-			tablet.values[i] = make([]string, rowCount)
+			tablet.values[i] = make([]string, maxRowNumber)
 		default:
 			return nil, fmt.Errorf("illegal datatype %v", schema.DataType)
 		}
diff --git a/client/tablet_test.go b/client/tablet_test.go
index 09fc6ca..dd95cfc 100644
--- a/client/tablet_test.go
+++ b/client/tablet_test.go
@@ -29,46 +29,22 @@ func createTablet(size int) (*Tablet, error) {
 		{
 			Measurement: "restart_count",
 			DataType:    INT32,
-			Encoding:    RLE,
-			Compressor:  SNAPPY,
-			Properties: map[string]string{
-				"owner": "Mark Liu",
-			},
 		}, {
 			Measurement: "price",
 			DataType:    DOUBLE,
-			Encoding:    GORILLA,
-			Compressor:  SNAPPY,
 		}, {
 			Measurement: "tick_count",
 			DataType:    INT64,
-			Encoding:    RLE,
-			Compressor:  SNAPPY,
 		}, {
 			Measurement: "temperature",
 			DataType:    FLOAT,
-			Encoding:    GORILLA,
-			Compressor:  SNAPPY,
-			Properties: map[string]string{
-				"owner": "Mark Liu",
-			},
 		}, {
 			Measurement: "description",
 			DataType:    TEXT,
-			Encoding:    PLAIN,
-			Compressor:  SNAPPY,
-			Properties: map[string]string{
-				"owner": "Mark Liu",
-			},
 		},
 		{
 			Measurement: "status",
 			DataType:    BOOLEAN,
-			Encoding:    RLE,
-			Compressor:  SNAPPY,
-			Properties: map[string]string{
-				"owner": "Mark Liu",
-			},
 		},
 	}, size)
 	return tablet, err
@@ -95,46 +71,22 @@ func TestTablet_getDataTypes(t *testing.T) {
 					{
 						Measurement: "restart_count",
 						DataType:    INT32,
-						Encoding:    RLE,
-						Compressor:  SNAPPY,
-						Properties: map[string]string{
-							"owner": "Mark Liu",
-						},
 					}, {
 						Measurement: "price",
 						DataType:    DOUBLE,
-						Encoding:    GORILLA,
-						Compressor:  SNAPPY,
 					}, {
 						Measurement: "tick_count",
 						DataType:    INT64,
-						Encoding:    RLE,
-						Compressor:  SNAPPY,
 					}, {
 						Measurement: "temperature",
 						DataType:    FLOAT,
-						Encoding:    GORILLA,
-						Compressor:  SNAPPY,
-						Properties: map[string]string{
-							"owner": "Mark Liu",
-						},
 					}, {
 						Measurement: "description",
 						DataType:    TEXT,
-						Encoding:    PLAIN,
-						Compressor:  SNAPPY,
-						Properties: map[string]string{
-							"owner": "Mark Liu",
-						},
 					},
 					{
 						Measurement: "status",
 						DataType:    BOOLEAN,
-						Encoding:    RLE,
-						Compressor:  SNAPPY,
-						Properties: map[string]string{
-							"owner": "Mark Liu",
-						},
 					},
 				},
 				timestamps: []int64{},
@@ -151,7 +103,7 @@ func TestTablet_getDataTypes(t *testing.T) {
 				measurementSchemas: tt.fields.measurementSchemas,
 				timestamps:         tt.fields.timestamps,
 				values:             tt.fields.values,
-				rowCount:           tt.fields.rowCount,
+				maxRowNumber:       tt.fields.rowCount,
 			}
 			if got := tablet.getDataTypes(); !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("Tablet.getDataTypes() = %v, want %v", got, tt.want)
diff --git a/example/session_example.go b/example/session_example.go
index ce6a043..9d69ed7 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -475,6 +475,7 @@ func deleteData() {
 func insertTablet() {
 	if tablet, err := createTablet(12); err == nil {
 		status, err := session.InsertTablet(tablet, false)
+		tablet.Reset()
 		checkError(status, err)
 	} else {
 		log.Fatal(err)
@@ -484,6 +485,7 @@ func insertTablet() {
 func insertAlignedTablet() {
 	if tablet, err := createTablet(12); err == nil {
 		status, err := session.InsertAlignedTablet(tablet, false)
+		tablet.Reset()
 		checkError(status, err)
 	} else {
 		log.Fatal(err)
@@ -502,34 +504,22 @@ func createTablet(rowCount int) (*client.Tablet, error) {
 		{
 			Measurement: "restart_count",
 			DataType:    client.INT32,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "price",
 			DataType:    client.DOUBLE,
-			Encoding:    client.GORILLA,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "tick_count",
 			DataType:    client.INT64,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "temperature",
 			DataType:    client.FLOAT,
-			Encoding:    client.GORILLA,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "description",
 			DataType:    client.TEXT,
-			Encoding:    client.PLAIN,
-			Compressor:  client.SNAPPY,
 		},
 		{
 			Measurement: "status",
 			DataType:    client.BOOLEAN,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		},
 	}, rowCount)
 
@@ -546,6 +536,7 @@ func createTablet(rowCount int) (*client.Tablet, error) {
 		tablet.SetValueAt(rand.Float32(), 3, row)
 		tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
 		tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+		tablet.RowSize++
 	}
 	return tablet, nil
 }
diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go
index c43a4fb..ecd630b 100644
--- a/example/session_pool/session_pool_example.go
+++ b/example/session_pool/session_pool_example.go
@@ -405,6 +405,7 @@ func insertTablet() {
 	if err == nil {
 		if tablet, err := createTablet(12); err == nil {
 			status, err := session.InsertTablet(tablet, false)
+			tablet.Reset()
 			checkError(status, err)
 		} else {
 			log.Fatal(err)
@@ -419,6 +420,7 @@ func insertAlignedTablet() {
 	if err == nil {
 		if tablet, err := createTablet(12); err == nil {
 			status, err := session.InsertAlignedTablet(tablet, false)
+			tablet.Reset()
 			checkError(status, err)
 		} else {
 			log.Fatal(err)
@@ -439,34 +441,22 @@ func createTablet(rowCount int) (*client.Tablet, error) {
 		{
 			Measurement: "restart_count",
 			DataType:    client.INT32,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "price",
 			DataType:    client.DOUBLE,
-			Encoding:    client.GORILLA,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "tick_count",
 			DataType:    client.INT64,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "temperature",
 			DataType:    client.FLOAT,
-			Encoding:    client.GORILLA,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "description",
 			DataType:    client.TEXT,
-			Encoding:    client.PLAIN,
-			Compressor:  client.SNAPPY,
 		},
 		{
 			Measurement: "status",
 			DataType:    client.BOOLEAN,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		},
 	}, rowCount)
 
@@ -483,6 +473,7 @@ func createTablet(rowCount int) (*client.Tablet, error) {
 		tablet.SetValueAt(rand.Float32(), 3, row)
 		tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
 		tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+		tablet.RowSize++
 	}
 	return tablet, nil
 }
@@ -498,11 +489,16 @@ func insertTablets() {
 	}
 
 	tablets := []*client.Tablet{tablet1, tablet2}
+	tablet1.Reset()
+	tablet2.Reset()
 	session, err := sessionPool.GetSession()
 	defer sessionPool.PutBack(session)
 	if err == nil {
 		checkError(session.InsertTablets(tablets, false))
 	}
+	for _, tablet := range tablets {
+		tablet.Reset()
+	}
 
 }
 
@@ -522,6 +518,9 @@ func insertAlignedTablets() {
 	if err == nil {
 		checkError(session.InsertAlignedTablets(tablets, false))
 	}
+	for _, tablet := range tablets {
+		tablet.Reset()
+	}
 
 }
 
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index beebf76..77afce7 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -221,6 +221,7 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() {
 	if tablet, err := createTablet(12); err == nil {
 		status, err := s.session.InsertAlignedTablet(tablet, false)
 		s.checkError(status, err)
+		tablet.Reset()
 	} else {
 		log.Fatal(err)
 	}
@@ -240,34 +241,22 @@ func createTablet(rowCount int) (*client.Tablet, error) {
 		{
 			Measurement: "restart_count",
 			DataType:    client.INT32,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "price",
 			DataType:    client.DOUBLE,
-			Encoding:    client.GORILLA,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "tick_count",
 			DataType:    client.INT64,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "temperature",
 			DataType:    client.FLOAT,
-			Encoding:    client.GORILLA,
-			Compressor:  client.SNAPPY,
 		}, {
 			Measurement: "description",
 			DataType:    client.TEXT,
-			Encoding:    client.PLAIN,
-			Compressor:  client.SNAPPY,
 		},
 		{
 			Measurement: "status",
 			DataType:    client.BOOLEAN,
-			Encoding:    client.RLE,
-			Compressor:  client.SNAPPY,
 		},
 	}, rowCount)
 
@@ -284,6 +273,7 @@ func createTablet(rowCount int) (*client.Tablet, error) {
 		tablet.SetValueAt(rand.Float32(), 3, row)
 		tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
 		tablet.SetValueAt(bool(ts%2 == 0), 5, row)
+		tablet.RowSize++
 	}
 	return tablet, nil
 }
@@ -302,6 +292,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablets() {
 
 	tablets := []*client.Tablet{tablet1, tablet2}
 	s.checkError(s.session.InsertAlignedTablets(tablets, false))
+	tablet1.Reset()
+	tablet2.Reset()
 
 	ds, err := s.session.ExecuteQueryStatement("select count(status) from root.ln.device1", nil)
 	assert := s.Require()