You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/08/20 03:13:10 UTC

[incubator-iotdb] branch master updated: Fix bugs in RowBatch

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fc03f2d  Fix bugs in RowBatch
     new 1077647  Merge pull request #344 from jack870131/support_batched_ingestion
fc03f2d is described below

commit fc03f2d6330eeb81d72c30a84ccc722080536d68
Author: jack870131 <ja...@outlook.com>
AuthorDate: Tue Aug 20 10:20:03 2019 +0800

    Fix bugs in RowBatch
---
 docs/Documentation/UserGuide/7-TsFile/2-Usage.md   |  8 +++----
 .../iotdb/tsfile/TsFileWriteWithRowBatch.java      |  6 ++---
 .../tsfile/write/chunk/ChunkGroupWriterImpl.java   | 13 +++++-----
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 28 +++++++++++-----------
 .../iotdb/tsfile/write/chunk/IChunkWriter.java     | 14 +++++------
 .../apache/iotdb/tsfile/write/page/PageWriter.java | 28 +++++++++++-----------
 .../iotdb/tsfile/write/TsFileReadWriteTest.java    |  2 +-
 7 files changed, 50 insertions(+), 49 deletions(-)

diff --git a/docs/Documentation/UserGuide/7-TsFile/2-Usage.md b/docs/Documentation/UserGuide/7-TsFile/2-Usage.md
index 758fc13..bcb8a63 100644
--- a/docs/Documentation/UserGuide/7-TsFile/2-Usage.md
+++ b/docs/Documentation/UserGuide/7-TsFile/2-Usage.md
@@ -208,7 +208,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  * It uses the interface:
  * public void addMeasurement(MeasurementSchema MeasurementSchema) throws WriteProcessException
  */
-public class TsFileWrite {
+public class TsFileWriteWithTSRecord {
 
   public static void main(String args[]) {
     try {
@@ -295,7 +295,7 @@ public class TsFileWriteWithRowBatch {
       RowBatch rowBatch = fileSchema.createRowBatch("device_1");
 
       long[] timestamps = rowBatch.timestamps;
-      Object[] sensors = rowBatch.sensors;
+      Object[] values = rowBatch.values;
 
       long timestamp = 1;
       long value = 1000000L;
@@ -304,11 +304,11 @@ public class TsFileWriteWithRowBatch {
         int row = rowBatch.batchSize++;
         timestamps[row] = timestamp++;
         for (int i = 0; i < sensorNum; i++) {
-          long[] sensor = (long[]) sensors[i];
+          long[] sensor = (long[]) values[i];
           sensor[row] = value;
         }
         // write RowBatch to TsFile
-        if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
+        if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
           tsFileWriter.write(rowBatch);
           rowBatch.reset();
         }
diff --git a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java
index 0348ec5..416ef74 100644
--- a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java
+++ b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java
@@ -59,7 +59,7 @@ public class TsFileWriteWithRowBatch {
       RowBatch rowBatch = fileSchema.createRowBatch("device_1");
 
       long[] timestamps = rowBatch.timestamps;
-      Object[] sensors = rowBatch.sensors;
+      Object[] values = rowBatch.values;
 
       long timestamp = 1;
       long value = 1000000L;
@@ -68,11 +68,11 @@ public class TsFileWriteWithRowBatch {
         int row = rowBatch.batchSize++;
         timestamps[row] = timestamp++;
         for (int i = 0; i < sensorNum; i++) {
-          long[] sensor = (long[]) sensors[i];
+          long[] sensor = (long[]) values[i];
           sensor[row] = value;
         }
         // write RowBatch to TsFile
-        if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
+        if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
           tsFileWriter.write(rowBatch);
           rowBatch.reset();
         }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
index 3b20664..9efcff1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
@@ -91,24 +91,25 @@ public class ChunkGroupWriterImpl implements IChunkGroupWriter {
 
   private void writeByDataType(
           RowBatch rowBatch, String measurementId, TSDataType dataType, int index) throws IOException {
+    int batchSize = rowBatch.batchSize;
     switch (dataType) {
       case INT32:
-        chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index]);
+        chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index], batchSize);
         break;
       case INT64:
-        chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index]);
+        chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index], batchSize);
         break;
       case FLOAT:
-        chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index]);
+        chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index], batchSize);
         break;
       case DOUBLE:
-        chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index]);
+        chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index], batchSize);
         break;
       case BOOLEAN:
-        chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index]);
+        chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index], batchSize);
         break;
       case TEXT:
-        chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index]);
+        chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index], batchSize);
         break;
       default:
         throw new UnSupportedDataTypeException(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index bab345e..3ca4d9f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -200,85 +200,85 @@ public class ChunkWriterImpl implements IChunkWriter {
   }
 
   @Override
-  public void write(long[] timestamps, int[] values) {
+  public void write(long[] timestamps, int[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long[] timestamps, long[] values) {
+  public void write(long[] timestamps, long[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long[] timestamps, boolean[] values) {
+  public void write(long[] timestamps, boolean[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long[] timestamps, float[] values) {
+  public void write(long[] timestamps, float[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long[] timestamps, double[] values) {
+  public void write(long[] timestamps, double[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long[] timestamps, BigDecimal[] values) {
+  public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
 
   @Override
-  public void write(long[] timestamps, Binary[] values) {
+  public void write(long[] timestamps, Binary[] values, int batchSize) {
     this.time = timestamps[timestamps.length - 1];
     valueCountInOnePage += timestamps.length;
     if (minTimestamp == Long.MIN_VALUE) {
       minTimestamp = timestamps[0];
     }
-    dataPageWriter.write(timestamps, values);
+    dataPageWriter.write(timestamps, values, batchSize);
     pageStatistics.updateStats(values);
     checkPageSizeAndMayOpenANewPage();
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
index 8fd601e..616ef77 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
@@ -68,37 +68,37 @@ public interface IChunkWriter {
   /**
    * write time series
    */
-  void write(long[] timestamps, int[] values);
+  void write(long[] timestamps, int[] values, int batchSize);
 
   /**
    * write time series
    */
-  void write(long[] timestamps, long[] values);
+  void write(long[] timestamps, long[] values, int batchSize);
 
   /**
    * write time series
    */
-  void write(long[] timestamps, boolean[] values);
+  void write(long[] timestamps, boolean[] values, int batchSize);
 
   /**
    * write time series
    */
-  void write(long[] timestamps, float[] values);
+  void write(long[] timestamps, float[] values, int batchSize);
 
   /**
    * write time series
    */
-  void write(long[] timestamps, double[] values);
+  void write(long[] timestamps, double[] values, int batchSize);
 
   /**
    * write time series
    */
-  void write(long[] timestamps, BigDecimal[] values);
+  void write(long[] timestamps, BigDecimal[] values, int batchSize);
 
   /**
    * write time series
    */
-  void write(long[] timestamps, Binary[] values);
+  void write(long[] timestamps, Binary[] values, int batchSize);
 
   /**
    * flush data to TsFileIOWriter.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 2aad435..25f9e48 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -124,8 +124,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, boolean[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, boolean[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
@@ -134,8 +134,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, int[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, int[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
@@ -144,8 +144,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, long[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, long[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
@@ -154,8 +154,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, float[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, float[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
@@ -164,8 +164,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, double[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, double[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
@@ -174,8 +174,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, BigDecimal[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
@@ -184,8 +184,8 @@ public class PageWriter {
   /**
    * write time series into encoder
    */
-  public void write(long[] timestamps, Binary[] values) {
-    for (int i = 0; i < timestamps.length; i++) {
+  public void write(long[] timestamps, Binary[] values, int batchSize) {
+    for (int i = 0; i < batchSize; i++) {
       timeEncoder.encode(timestamps[i], timeOut);
       valueEncoder.encode(values[i], valueOut);
     }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
index f72e11c..8d384c0 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
@@ -155,7 +155,7 @@ public class TsFileReadWriteTest {
     FileSchema fileSchema = new FileSchema();
     fileSchema.registerMeasurement(
             new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
-    int rowNum = 1024 * 1024;
+    int rowNum = 1024 * 1024 * 13 + 1023;
     int sensorNum = 1;
     TsFileWriter tsFileWriter = new TsFileWriter(f, fileSchema);
     RowBatch rowBatch = fileSchema.createRowBatch("device_1");