You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/08/20 03:13:10 UTC
[incubator-iotdb] branch master updated: Fix bugs in RowBatch
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fc03f2d Fix bugs in RowBatch
new 1077647 Merge pull request #344 from jack870131/support_batched_ingestion
fc03f2d is described below
commit fc03f2d6330eeb81d72c30a84ccc722080536d68
Author: jack870131 <ja...@outlook.com>
AuthorDate: Tue Aug 20 10:20:03 2019 +0800
Fix bugs in RowBatch
---
docs/Documentation/UserGuide/7-TsFile/2-Usage.md | 8 +++----
.../iotdb/tsfile/TsFileWriteWithRowBatch.java | 6 ++---
.../tsfile/write/chunk/ChunkGroupWriterImpl.java | 13 +++++-----
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 28 +++++++++++-----------
.../iotdb/tsfile/write/chunk/IChunkWriter.java | 14 +++++------
.../apache/iotdb/tsfile/write/page/PageWriter.java | 28 +++++++++++-----------
.../iotdb/tsfile/write/TsFileReadWriteTest.java | 2 +-
7 files changed, 50 insertions(+), 49 deletions(-)
diff --git a/docs/Documentation/UserGuide/7-TsFile/2-Usage.md b/docs/Documentation/UserGuide/7-TsFile/2-Usage.md
index 758fc13..bcb8a63 100644
--- a/docs/Documentation/UserGuide/7-TsFile/2-Usage.md
+++ b/docs/Documentation/UserGuide/7-TsFile/2-Usage.md
@@ -208,7 +208,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
* It uses the interface:
* public void addMeasurement(MeasurementSchema MeasurementSchema) throws WriteProcessException
*/
-public class TsFileWrite {
+public class TsFileWriteWithTSRecord {
public static void main(String args[]) {
try {
@@ -295,7 +295,7 @@ public class TsFileWriteWithRowBatch {
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
long[] timestamps = rowBatch.timestamps;
- Object[] sensors = rowBatch.sensors;
+ Object[] values = rowBatch.values;
long timestamp = 1;
long value = 1000000L;
@@ -304,11 +304,11 @@ public class TsFileWriteWithRowBatch {
int row = rowBatch.batchSize++;
timestamps[row] = timestamp++;
for (int i = 0; i < sensorNum; i++) {
- long[] sensor = (long[]) sensors[i];
+ long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write RowBatch to TsFile
- if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
+ if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
tsFileWriter.write(rowBatch);
rowBatch.reset();
}
diff --git a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java
index 0348ec5..416ef74 100644
--- a/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java
+++ b/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java
@@ -59,7 +59,7 @@ public class TsFileWriteWithRowBatch {
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
long[] timestamps = rowBatch.timestamps;
- Object[] sensors = rowBatch.sensors;
+ Object[] values = rowBatch.values;
long timestamp = 1;
long value = 1000000L;
@@ -68,11 +68,11 @@ public class TsFileWriteWithRowBatch {
int row = rowBatch.batchSize++;
timestamps[row] = timestamp++;
for (int i = 0; i < sensorNum; i++) {
- long[] sensor = (long[]) sensors[i];
+ long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write RowBatch to TsFile
- if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
+ if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
tsFileWriter.write(rowBatch);
rowBatch.reset();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
index 3b20664..9efcff1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
@@ -91,24 +91,25 @@ public class ChunkGroupWriterImpl implements IChunkGroupWriter {
private void writeByDataType(
RowBatch rowBatch, String measurementId, TSDataType dataType, int index) throws IOException {
+ int batchSize = rowBatch.batchSize;
switch (dataType) {
case INT32:
- chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index]);
+ chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index], batchSize);
break;
case INT64:
- chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index]);
+ chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index], batchSize);
break;
case FLOAT:
- chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index]);
+ chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index], batchSize);
break;
case DOUBLE:
- chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index]);
+ chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index], batchSize);
break;
case BOOLEAN:
- chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index]);
+ chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index], batchSize);
break;
case TEXT:
- chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index]);
+ chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index], batchSize);
break;
default:
throw new UnSupportedDataTypeException(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index bab345e..3ca4d9f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -200,85 +200,85 @@ public class ChunkWriterImpl implements IChunkWriter {
}
@Override
- public void write(long[] timestamps, int[] values) {
+ public void write(long[] timestamps, int[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long[] timestamps, long[] values) {
+ public void write(long[] timestamps, long[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long[] timestamps, boolean[] values) {
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long[] timestamps, float[] values) {
+ public void write(long[] timestamps, float[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long[] timestamps, double[] values) {
+ public void write(long[] timestamps, double[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long[] timestamps, BigDecimal[] values) {
+ public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
@Override
- public void write(long[] timestamps, Binary[] values) {
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
- dataPageWriter.write(timestamps, values);
+ dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
index 8fd601e..616ef77 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java
@@ -68,37 +68,37 @@ public interface IChunkWriter {
/**
* write time series
*/
- void write(long[] timestamps, int[] values);
+ void write(long[] timestamps, int[] values, int batchSize);
/**
* write time series
*/
- void write(long[] timestamps, long[] values);
+ void write(long[] timestamps, long[] values, int batchSize);
/**
* write time series
*/
- void write(long[] timestamps, boolean[] values);
+ void write(long[] timestamps, boolean[] values, int batchSize);
/**
* write time series
*/
- void write(long[] timestamps, float[] values);
+ void write(long[] timestamps, float[] values, int batchSize);
/**
* write time series
*/
- void write(long[] timestamps, double[] values);
+ void write(long[] timestamps, double[] values, int batchSize);
/**
* write time series
*/
- void write(long[] timestamps, BigDecimal[] values);
+ void write(long[] timestamps, BigDecimal[] values, int batchSize);
/**
* write time series
*/
- void write(long[] timestamps, Binary[] values);
+ void write(long[] timestamps, Binary[] values, int batchSize);
/**
* flush data to TsFileIOWriter.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
index 2aad435..25f9e48 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/page/PageWriter.java
@@ -124,8 +124,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, boolean[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, boolean[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
@@ -134,8 +134,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, int[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, int[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
@@ -144,8 +144,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, long[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, long[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
@@ -154,8 +154,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, float[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, float[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
@@ -164,8 +164,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, double[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, double[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
@@ -174,8 +174,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, BigDecimal[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
@@ -184,8 +184,8 @@ public class PageWriter {
/**
* write time series into encoder
*/
- public void write(long[] timestamps, Binary[] values) {
- for (int i = 0; i < timestamps.length; i++) {
+ public void write(long[] timestamps, Binary[] values, int batchSize) {
+ for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
index f72e11c..8d384c0 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileReadWriteTest.java
@@ -155,7 +155,7 @@ public class TsFileReadWriteTest {
FileSchema fileSchema = new FileSchema();
fileSchema.registerMeasurement(
new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
- int rowNum = 1024 * 1024;
+ int rowNum = 1024 * 1024 * 13 + 1023;
int sensorNum = 1;
TsFileWriter tsFileWriter = new TsFileWriter(f, fileSchema);
RowBatch rowBatch = fileSchema.createRowBatch("device_1");