You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/05 14:08:49 UTC
[iotdb] 01/01: Fix desc batchdata count bug (#2186)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cherry-pick-2186
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e0aa4a26513cfd7b7bbe9473e9d3d7f0f600f8b
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Sat Dec 5 22:06:29 2020 +0800
Fix desc batchdata count bug (#2186)
remove Serializable interface on BatchData
Co-authored-by: LebronAl <TX...@gmail.com>
---
.../iotdb/db/query/reader/chunk/MemPageReader.java | 2 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 4 +-
.../iotdb/db/integration/IOTDBGroupByIT.java | 5 +
.../iotdb/db/integration/IoTDBAggregationIT.java | 5 +
.../integration/IoTDBAggregationLargeDataIT.java | 5 +
.../integration/IoTDBAggregationSmallDataIT.java | 7 +-
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 7 +-
.../iotdb/db/integration/IoTDBLargeDataIT.java | 5 +
.../IoTDBMultiOverlappedChunkInUnseqIT.java | 5 +
.../db/integration/IoTDBMultiOverlappedPageIT.java | 5 +
.../iotdb/db/integration/IoTDBMultiSeriesIT.java | 5 +
.../db/integration/IoTDBMultiStatementsIT.java | 5 +
.../db/integration/IoTDBOverlappedPageIT.java | 5 +
.../apache/iotdb/tsfile/read/common/BatchData.java | 35 +-
.../iotdb/tsfile/read/common/BatchDataFactory.java | 7 +-
.../{DescBatchData.java => DescReadBatchData.java} | 14 +-
.../tsfile/read/common/DescReadWriteBatchData.java | 374 +++++++++++++++++++++
.../iotdb/tsfile/read/reader/page/PageReader.java | 2 +-
18 files changed, 469 insertions(+), 28 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 2acaf05..f28ea2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -44,7 +44,7 @@ public class MemPageReader implements IPageReader {
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
BatchData batchData = BatchDataFactory
- .createBatchData(chunkMetadata.getDataType(), ascending);
+ .createBatchData(chunkMetadata.getDataType(), ascending, false);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
if (valueFilter == null || valueFilter
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 8aa4176..b433bae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -578,7 +578,8 @@ public class SeriesReader {
if (mergeReader.hasNextTimeValuePair()) {
- cachedBatchData = BatchDataFactory.createBatchData(dataType);
+ cachedBatchData = BatchDataFactory
+ .createBatchData(dataType, orderUtils.getAscending(), true);
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
if (firstPageReader != null) {
currentPageEndPointTime = orderUtils
@@ -653,6 +654,7 @@ public class SeriesReader {
timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
}
}
+ cachedBatchData.flip();
hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
/*
* if current overlapped page has valid data, return, otherwise read next overlapped page
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
index 330264e..f2989ec 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -104,6 +105,8 @@ public class IOTDBGroupByIT {
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1000);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -114,6 +117,8 @@ public class IOTDBGroupByIT {
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
index 16a3b05..3d8f1b5 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java
@@ -37,6 +37,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Locale;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -101,6 +102,8 @@ public class IoTDBAggregationIT {
EnvironmentUtils.closeStatMonitor();
prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1000);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData();
@@ -110,6 +113,8 @@ public class IoTDBAggregationIT {
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
//add test for part of points in page don't satisfy filter
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java
index ac9cc31..216ded2 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataIT.java
@@ -36,6 +36,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -117,6 +118,8 @@ public class IoTDBAggregationLargeDataIT {
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
EnvironmentUtils.envSetUp();
}
@@ -124,6 +127,8 @@ public class IoTDBAggregationLargeDataIT {
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
index b73ff06..a10ff29 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java
@@ -35,6 +35,8 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.jdbc.IoTDBSQLException;
@@ -123,7 +125,8 @@ public class IoTDBAggregationSmallDataIT {
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
-
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
//Thread.sleep(5000);
insertSQL();
}
@@ -131,6 +134,8 @@ public class IoTDBAggregationSmallDataIT {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index e4a319a..467754a 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.AfterClass;
@@ -108,7 +110,8 @@ public class IoTDBAlignByDeviceIT {
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
-
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
insertData();
}
@@ -116,6 +119,8 @@ public class IoTDBAlignByDeviceIT {
@AfterClass
public static void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
private static void insertData() throws ClassNotFoundException {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
index a1ce633..d949d05 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
@@ -30,6 +30,7 @@ import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -54,6 +55,8 @@ public class IoTDBLargeDataIT {
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
// use small page setting
// origin value
@@ -80,6 +83,8 @@ public class IoTDBLargeDataIT {
tsFileConfig.setPageSizeInByte(pageSizeInByte);
tsFileConfig.setGroupSizeInByte(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
EnvironmentUtils.cleanEnv();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java
index 62422ef..00c5c66 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedChunkInUnseqIT.java
@@ -28,6 +28,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.AfterClass;
@@ -45,6 +46,8 @@ public class IoTDBMultiOverlappedChunkInUnseqIT {
@BeforeClass
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024);
EnvironmentUtils.envSetUp();
@@ -57,6 +60,8 @@ public class IoTDBMultiOverlappedChunkInUnseqIT {
// recovery value
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java
index 7819366..17dd4a5 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiOverlappedPageIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -48,6 +49,8 @@ public class IoTDBMultiOverlappedPageIT {
@BeforeClass
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
// max_number_of_points_in_page = 10
@@ -64,6 +67,8 @@ public class IoTDBMultiOverlappedPageIT {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
index e755b66..82d0442 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
@@ -30,6 +30,7 @@ import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -57,6 +58,8 @@ public class IoTDBMultiSeriesIT {
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
// use small page setting
// origin value
@@ -88,6 +91,8 @@ public class IoTDBMultiSeriesIT {
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
TSFileDescriptor.getInstance().getConfig().setCompressor("SNAPPY");
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
private static void insertData()
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
index d155bd2..b6fecca 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiStatementsIT.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -51,6 +52,8 @@ public class IoTDBMultiStatementsIT {
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
// use small page setting
// origin value
@@ -76,6 +79,8 @@ public class IoTDBMultiStatementsIT {
tsFileConfig.setPageSizeInByte(pageSizeInByte);
tsFileConfig.setGroupSizeInByte(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
EnvironmentUtils.cleanEnv();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
index bbfcf64..76131ad 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -48,6 +49,8 @@ public class IoTDBOverlappedPageIT {
@BeforeClass
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
// max_number_of_points_in_page = 10
@@ -64,6 +67,8 @@ public class IoTDBOverlappedPageIT {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
+ IoTDBDescriptor.getInstance().getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 88d921a..d17a789 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -18,10 +18,8 @@
*/
package org.apache.iotdb.tsfile.read.common;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.BiPredicate;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,10 +48,9 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong;
* while (batchData.hasCurrent()) { long time = batchData.currentTime(); Object value =
* batchData.currentValue(); batchData.next(); }
*/
-public class BatchData implements Serializable {
+public class BatchData {
- private static final long serialVersionUID = -4620310601188394839L;
- private static final int capacityThreshold = TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
+ public static final int CAPACITY_THRESHOLD = TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
protected int capacity = 16;
protected TSDataType dataType;
@@ -69,15 +66,15 @@ public class BatchData implements Serializable {
protected int writeCurArrayIndex;
// the insert timestamp number of timeRet
- private int count;
+ protected int count;
- private List<long[]> timeRet;
- private List<boolean[]> booleanRet;
- private List<int[]> intRet;
- private List<long[]> longRet;
- private List<float[]> floatRet;
- private List<double[]> doubleRet;
- private List<Binary[]> binaryRet;
+ protected List<long[]> timeRet;
+ protected List<boolean[]> booleanRet;
+ protected List<int[]> intRet;
+ protected List<long[]> longRet;
+ protected List<float[]> floatRet;
+ protected List<double[]> doubleRet;
+ protected List<Binary[]> binaryRet;
public BatchData() {
dataType = null;
@@ -217,7 +214,7 @@ public class BatchData implements Serializable {
*/
public void putBoolean(long t, boolean v) {
if (writeCurArrayIndex == capacity) {
- if (capacity >= capacityThreshold) {
+ if (capacity >= CAPACITY_THRESHOLD) {
timeRet.add(new long[capacity]);
booleanRet.add(new boolean[capacity]);
writeCurListIndex++;
@@ -252,7 +249,7 @@ public class BatchData implements Serializable {
*/
public void putInt(long t, int v) {
if (writeCurArrayIndex == capacity) {
- if (capacity >= capacityThreshold) {
+ if (capacity >= CAPACITY_THRESHOLD) {
timeRet.add(new long[capacity]);
intRet.add(new int[capacity]);
writeCurListIndex++;
@@ -287,7 +284,7 @@ public class BatchData implements Serializable {
*/
public void putLong(long t, long v) {
if (writeCurArrayIndex == capacity) {
- if (capacity >= capacityThreshold) {
+ if (capacity >= CAPACITY_THRESHOLD) {
timeRet.add(new long[capacity]);
longRet.add(new long[capacity]);
writeCurListIndex++;
@@ -322,7 +319,7 @@ public class BatchData implements Serializable {
*/
public void putFloat(long t, float v) {
if (writeCurArrayIndex == capacity) {
- if (capacity >= capacityThreshold) {
+ if (capacity >= CAPACITY_THRESHOLD) {
timeRet.add(new long[capacity]);
floatRet.add(new float[capacity]);
writeCurListIndex++;
@@ -357,7 +354,7 @@ public class BatchData implements Serializable {
*/
public void putDouble(long t, double v) {
if (writeCurArrayIndex == capacity) {
- if (capacity >= capacityThreshold) {
+ if (capacity >= CAPACITY_THRESHOLD) {
timeRet.add(new long[capacity]);
doubleRet.add(new double[capacity]);
writeCurListIndex++;
@@ -391,7 +388,7 @@ public class BatchData implements Serializable {
*/
public void putBinary(long t, Binary v) {
if (writeCurArrayIndex == capacity) {
- if (capacity >= capacityThreshold) {
+ if (capacity >= CAPACITY_THRESHOLD) {
timeRet.add(new long[capacity]);
binaryRet.add(new Binary[capacity]);
writeCurListIndex++;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
index 2f60bfe..b0cdeb2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
@@ -27,11 +27,14 @@ public class BatchDataFactory {
throw new IllegalStateException("Factory class");
}
- public static BatchData createBatchData(TSDataType dataType, boolean ascending) {
+ public static BatchData createBatchData(TSDataType dataType, boolean ascending, boolean isWriteDesc) {
if (ascending) {
return new BatchData(dataType);
+ } else if (isWriteDesc) {
+ return new DescReadWriteBatchData(dataType);
+ } else {
+ return new DescReadBatchData(dataType);
}
- return new DescBatchData(dataType);
}
public static BatchData createBatchData(TSDataType dataType) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
similarity index 84%
rename from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
rename to tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
index 55a795d..6da303c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadBatchData.java
@@ -21,9 +21,19 @@ package org.apache.iotdb.tsfile.read.common;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-public class DescBatchData extends BatchData {
+/**
+ * This class is just for reading batch data reversely. The data source is from page reader.
+ * For example,
+ * the timeRet from pageReader is [1, 1000],
+ * It will be written in ascending sequence, but the sequence of reading will be 1000 -> 1.
+ */
+public class DescReadBatchData extends BatchData {
+
+ public DescReadBatchData() {
+
+ }
- public DescBatchData(TSDataType dataType) {
+ public DescReadBatchData(TSDataType dataType) {
super(dataType);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
new file mode 100644
index 0000000..bbfe26d
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescReadWriteBatchData.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.common;
+
+import java.util.LinkedList;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+/**
+ * This class is for reading and writing batch data in reverse. The data source is from mergeReader.
+ * For example,
+ * the time sequence from mergeReader is 1000 -> 1, to keep the consistency that the timestamp
+ * should be ascending. It will be written in reverse, i.e. the timeRet will be [1, 1000].
+ * Then it can be handled the same as DescReadBatchData.
+ */
+public class DescReadWriteBatchData extends DescReadBatchData {
+
+ public DescReadWriteBatchData(TSDataType dataType) {
+ super();
+ this.dataType = dataType;
+ this.readCurListIndex = 0;
+ this.readCurArrayIndex = 0;
+ this.writeCurListIndex = 0;
+ this.writeCurArrayIndex = capacity - 1;
+
+ timeRet = new LinkedList<>();
+ timeRet.add(new long[capacity]);
+ count = 0;
+
+ switch (dataType) {
+ case BOOLEAN:
+ booleanRet = new LinkedList<>();
+ booleanRet.add(new boolean[capacity]);
+ break;
+ case INT32:
+ intRet = new LinkedList<>();
+ intRet.add(new int[capacity]);
+ break;
+ case INT64:
+ longRet = new LinkedList<>();
+ longRet.add(new long[capacity]);
+ break;
+ case FLOAT:
+ floatRet = new LinkedList<>();
+ floatRet.add(new float[capacity]);
+ break;
+ case DOUBLE:
+ doubleRet = new LinkedList<>();
+ doubleRet.add(new double[capacity]);
+ break;
+ case TEXT:
+ binaryRet = new LinkedList<>();
+ binaryRet.add(new Binary[capacity]);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+
+ /**
+ * put boolean data reversely.
+ *
+ * @param t timestamp
+ * @param v boolean data
+ */
+ @Override
+ public void putBoolean(long t, boolean v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList) timeRet).addFirst(new long[capacity]);
+ ((LinkedList) booleanRet).addFirst(new boolean[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ boolean[] newValueData = new boolean[newCapacity];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(booleanRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ booleanRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ booleanRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
+ /**
+ * put int data reversely.
+ *
+ * @param t timestamp
+ * @param v int data
+ */
+ @Override
+ public void putInt(long t, int v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList) timeRet).addFirst(new long[capacity]);
+ ((LinkedList) intRet).addFirst(new int[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ int[] newValueData = new int[newCapacity];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(intRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ intRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ intRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
+ /**
+ * put long data reversely.
+ *
+ * @param t timestamp
+ * @param v long data
+ */
+ @Override
+ public void putLong(long t, long v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList) timeRet).addFirst(new long[capacity]);
+ ((LinkedList) longRet).addFirst(new long[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ long[] newValueData = new long[newCapacity];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(longRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ longRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ longRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
+ /**
+ * put float data reversely.
+ *
+ * @param t timestamp
+ * @param v float data
+ */
+ @Override
+ public void putFloat(long t, float v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList) timeRet).addFirst(new long[capacity]);
+ ((LinkedList) floatRet).addFirst(new float[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ float[] newValueData = new float[newCapacity];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(floatRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ floatRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ floatRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
+ /**
+ * put double data reversely.
+ *
+ * @param t timestamp
+ * @param v double data
+ */
+ @Override
+ public void putDouble(long t, double v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList) timeRet).addFirst(new long[capacity]);
+ ((LinkedList) doubleRet).addFirst(new double[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ double[] newValueData = new double[newCapacity];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(doubleRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ doubleRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ doubleRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
+ /**
+ * put binary data reversely.
+ *
+ * @param t timestamp
+ * @param v binary data.
+ */
+ @Override
+ public void putBinary(long t, Binary v) {
+ if (writeCurArrayIndex == -1) {
+ if (capacity >= CAPACITY_THRESHOLD) {
+ ((LinkedList) timeRet).addFirst(new long[capacity]);
+ ((LinkedList) binaryRet).addFirst(new Binary[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = capacity - 1;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ Binary[] newValueData = new Binary[newCapacity];
+
+ System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity);
+ System.arraycopy(binaryRet.get(0), 0, newValueData, newCapacity - capacity, capacity);
+
+ timeRet.set(0, newTimeData);
+ binaryRet.set(0, newValueData);
+
+ writeCurArrayIndex = newCapacity - capacity - 1;
+ capacity = newCapacity;
+ }
+ }
+ timeRet.get(0)[writeCurArrayIndex] = t;
+ binaryRet.get(0)[writeCurArrayIndex] = v;
+
+ writeCurArrayIndex--;
+ count++;
+ }
+
+ @Override
+ public boolean hasCurrent() {
+ return (readCurListIndex == 0 && readCurArrayIndex > writeCurArrayIndex) || (
+ readCurListIndex > 0 && readCurArrayIndex >= 0);
+ }
+
+ @Override
+ public void next() {
+ super.readCurArrayIndex--;
+ if ((readCurListIndex == 0 && readCurArrayIndex <= writeCurArrayIndex)
+ || readCurArrayIndex == -1) {
+ super.readCurListIndex--;
+ super.readCurArrayIndex = capacity - 1;
+ }
+ }
+
+ @Override
+ public void resetBatchData() {
+ super.readCurArrayIndex = capacity - 1;
+ super.readCurListIndex = writeCurListIndex;
+ }
+
+ @Override
+ public long getTimeByIndex(int idx) {
+ return timeRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ @Override
+ public long getLongByIndex(int idx) {
+ return longRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ @Override
+ public double getDoubleByIndex(int idx) {
+ return doubleRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ @Override
+ public int getIntByIndex(int idx) {
+ return intRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ @Override
+ public float getFloatByIndex(int idx) {
+ return floatRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ @Override
+ public Binary getBinaryByIndex(int idx) {
+ return binaryRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ @Override
+ public boolean getBooleanByIndex(int idx) {
+ return booleanRet.get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1)
+ % capacity];
+ }
+
+ /**
+ * Read: When put data, the writeIndex increases while the readIndex remains 0.
+ * For descending read, we need to read from writeIndex to writeCurArrayIndex
+ */
+ @Override
+ public BatchData flip() {
+ super.readCurArrayIndex = capacity - 1;
+ super.readCurListIndex = writeCurListIndex;
+ return this;
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index efc3b01..a6f7211 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -107,7 +107,7 @@ public class PageReader implements IPageReader {
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
- BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending);
+ BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false);
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);