You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/01/26 07:07:55 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11]cherry pick fix unseq
merge end time bug (#2569)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 8998ff1 [To rel/0.11]cherry pick fix unseq merge end time bug (#2569)
8998ff1 is described below
commit 8998ff14aed9a8ca0572c036008b8e5606e70233
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Jan 26 15:07:16 2021 +0800
[To rel/0.11]cherry pick fix unseq merge end time bug (#2569)
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../iotdb/db/engine/merge/task/MergeFileTask.java | 15 ++++-
.../db/engine/merge/task/MergeMultiChunkTask.java | 2 +-
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 2 +-
.../iotdb/db/engine/merge/MergeTaskTest.java | 65 +++++++++++++++-------
.../iotdb/db/integration/IoTDBMergeTest.java | 51 ++++++++++++++++-
5 files changed, 108 insertions(+), 27 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 673ac34..367dd6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -177,6 +177,7 @@ class MergeFileTask {
}
}
}
+ updateStartTimeAndEndTime(seqFile, oldFileWriter);
oldFileWriter.endFile();
updatePlanIndexes(seqFile);
@@ -191,6 +192,18 @@ class MergeFileTask {
}
}
+ private void updateStartTimeAndEndTime(TsFileResource seqFile, TsFileIOWriter fileWriter) {
+ //TODO change to get one timeseries block each time
+ for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataEntry : fileWriter
+ .getDeviceChunkMetadataMap().entrySet()) {
+ String device = deviceChunkMetadataEntry.getKey();
+ for (ChunkMetadata chunkMetadata : deviceChunkMetadataEntry.getValue()) {
+ seqFile.updateStartTime(device, chunkMetadata.getStartTime());
+ seqFile.updateEndTime(device, chunkMetadata.getEndTime());
+ }
+ }
+ }
+
/**
* Restore an old seq file which is being written new chunks when exceptions occur or the task is
* aborted.
@@ -287,7 +300,7 @@ class MergeFileTask {
fileWriter.endChunkGroup();
}
}
-
+ updateStartTimeAndEndTime(seqFile, fileWriter);
fileWriter.endFile();
updatePlanIndexes(seqFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 5d47170..b1b03d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -230,7 +230,7 @@ public class MergeMultiChunkTask {
// series should also be written into a new chunk
List<Integer> ret = new ArrayList<>();
for (int i = 0; i < currMergingPaths.size(); i++) {
- if (seqChunkMeta[i].isEmpty()
+ if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()
&& !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePairs[i] != null)) {
continue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index ce063f4..c9070f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -217,7 +217,7 @@ public class MergeUtils {
public static boolean isChunkOverflowed(TimeValuePair timeValuePair, ChunkMetadata metaData) {
return timeValuePair != null
- && timeValuePair.getTimestamp() < metaData.getEndTime();
+ && timeValuePair.getTimestamp() <= metaData.getEndTime();
}
public static boolean isChunkTooSmall(int ptWritten, ChunkMetadata chunkMetaData,
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index fb73a27..4ecae39 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -50,7 +50,8 @@ public class MergeTaskTest extends MergeTest {
private File tempSGDir;
@Before
- public void setUp() throws IOException, WriteProcessException, MetadataException, MetadataException {
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException, MetadataException {
super.setUp();
tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
tempSGDir.mkdirs();
@@ -71,11 +72,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> list = new ArrayList<>();
list.add(seqResources.get(0));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
- list, new ArrayList<>(), null, null, true);
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context, list, new ArrayList<>(), null, null, true);
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
@@ -86,6 +88,18 @@ public class MergeTaskTest extends MergeTest {
}
@Test
+ public void testMergeEndTime() throws Exception {
+ List<TsFileResource> testSeqResources = seqResources.subList(0, 3);
+ List<TsFileResource> testUnseqResource = unseqResources.subList(5, 6);
+ MergeTask mergeTask =
+ new MergeTask(new MergeResource(testSeqResources, testUnseqResource), tempSGDir.getPath(),
+ (k, v, l) -> {
+ assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1"));
+ }, "test", false, 1, MERGE_TEST_SG);
+ mergeTask.call();
+ }
+
+ @Test
public void testFullMerge() throws Exception {
MergeTask mergeTask =
new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(),
@@ -95,10 +109,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> list = new ArrayList<>();
list.add(seqResources.get(0));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context,
list, new ArrayList<>(), null, null, true);
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
@@ -120,10 +136,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> resources = new ArrayList<>();
resources.add(seqResources.get(0));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context,
resources, new ArrayList<>(), null, null, true);
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
@@ -144,11 +162,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> list = new ArrayList<>();
list.add(seqResources.get(0));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
- list, new ArrayList<>(), null, null, true);
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context, list, new ArrayList<>(), null, null, true);
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
@@ -172,11 +191,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> list = new ArrayList<>();
list.add(seqResources.get(0));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
- list, new ArrayList<>(), null, null, true);
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context, list, new ArrayList<>(), null, null, true);
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
@@ -196,11 +216,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> list = new ArrayList<>();
list.add(seqResources.get(2));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
- list, new ArrayList<>(), null, null, true);
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context, list, new ArrayList<>(), null, null, true);
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
@@ -218,7 +239,8 @@ public class MergeTaskTest extends MergeTest {
public void mergeWithDeletionTest() throws Exception {
try {
PartialPath device = new PartialPath(deviceIds[0]);
- seqResources.get(0).getModFile().write(new Deletion(device.concatNode(measurementSchemas[0].getMeasurementId()), 10000, 0, 49));
+ seqResources.get(0).getModFile().write(
+ new Deletion(device.concatNode(measurementSchemas[0].getMeasurementId()), 10000, 0, 49));
} finally {
seqResources.get(0).getModFile().close();
}
@@ -236,11 +258,12 @@ public class MergeTaskTest extends MergeTest {
mergeTask.call();
QueryContext context = new QueryContext();
- PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+ PartialPath path = new PartialPath(
+ deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
List<TsFileResource> resources = new ArrayList<>();
resources.add(seqResources.get(0));
- IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
- resources, new ArrayList<>(), null, null, true);
+ IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+ context, resources, new ArrayList<>(), null, null, true);
int count = 0;
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 3c86a2f..500c9ce 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -51,7 +51,7 @@ public class IoTDBMergeTest {
.getCompactionStrategy();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1);
IoTDBDescriptor.getInstance().getConfig()
- .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
}
@@ -65,6 +65,52 @@ public class IoTDBMergeTest {
}
@Test
+ public void testOverlap() throws SQLException {
+ logger.info("test...");
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.mergeTest");
+ try {
+ statement.execute("CREATE TIMESERIES root.mergeTest.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+ } catch (SQLException e) {
+ // ignore
+ }
+
+ statement.execute(String
+ .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 1, 1));
+ statement.execute(String
+ .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 2));
+ statement.execute("FLUSH");
+ statement.execute(String
+ .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 5, 5));
+ statement.execute(String
+ .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 6, 6));
+ statement.execute("FLUSH");
+ statement.execute(String
+ .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 3));
+ statement.execute(String
+ .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 3, 3));
+ statement.execute("FLUSH");
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ long s1 = resultSet.getLong("root.mergeTest.s1");
+ if (time == 2) {
+ assertEquals(3, s1);
+ } else {
+ assertEquals(time, s1);
+ }
+ cnt++;
+ }
+ assertEquals(5, cnt);
+ }
+ }
+ }
+
+ @Test
public void test() throws SQLException {
logger.info("test...");
try (Connection connection = DriverManager
@@ -94,8 +140,7 @@ public class IoTDBMergeTest {
+ "%d,%d)", j, j + 10, j + 20, j + 30));
}
statement.execute("FLUSH");
- statement.execute("MERGE");
- try{
+ try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();