You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/05 12:07:05 UTC
[iotdb] branch rel/0.11 updated: fix compaction bug after write
data by deserialize merge (#2195)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 51dc09d fix compaction bug after write data by deserialize merge (#2195)
51dc09d is described below
commit 51dc09d268426e0a339bcde6066f2f6b6a5adb2d
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sat Dec 5 20:06:46 2020 +0800
fix compaction bug after write data by deserialize merge (#2195)
---
.../engine/compaction/utils/CompactionUtils.java | 12 ++++-
.../iotdb/db/integration/IoTDBCompactionIT.java | 56 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index d44234f..80c31cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -36,6 +36,9 @@ import java.util.TreeMap;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -141,8 +144,13 @@ public class CompactionUtils {
if (chunkMetadataList.isEmpty()) {
return maxVersion;
}
- IChunkWriter chunkWriter = new ChunkWriterImpl(
- new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType()));
+ IChunkWriter chunkWriter;
+ try {
+ chunkWriter = new ChunkWriterImpl(
+ IoTDB.metaManager.getSeriesSchema(new PartialPath(device), entry.getKey()));
+ } catch (MetadataException e) {
+ throw new IOException(e);
+ }
for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
writeTVPair(timeValuePair, chunkWriter);
targetResource.updateStartTime(device, timeValuePair.getTimestamp());
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
index d3cc788..e9779da 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
@@ -95,5 +95,61 @@ public class IoTDBCompactionIT {
assertEquals(32, cnt);
}
}
+
+ @Test
+ public void testAppendMergeAfterDeserializeMerge() throws SQLException {
+ boolean prevEnableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig()
+ .isEnableUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.compactionTest");
+ try {
+ statement.execute("CREATE TIMESERIES root.compactionTest.s1 WITH DATATYPE=INT64");
+ } catch (SQLException e) {
+ // ignore
+ }
+
+ long pageSize = 100;
+ long timestamp = 1;
+
+ for (long row = 0; row < 10000; row++) {
+ statement
+ .execute(
+ String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)",
+ timestamp, 1));
+ if (row % pageSize == 0) {
+ statement.execute("FLUSH");
+ }
+ timestamp++;
+ }
+
+ timestamp = 8322;
+ for (long row = 0; row < 2400; row++) {
+ statement
+ .execute(
+ String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)",
+ timestamp, 1));
+ if (row % pageSize == 0) {
+ statement.execute("FLUSH");
+ }
+ timestamp++;
+ }
+
+ int cnt;
+ try (ResultSet resultSet = statement
+ .executeQuery("SELECT COUNT(s1) FROM root.compactionTest")) {
+ cnt = 0;
+ while (resultSet.next()) {
+ System.out.println(resultSet.getLong(1));
+ assertEquals(10721, resultSet.getLong(1));
+ cnt++;
+ }
+ }
+ assertEquals(1, cnt);
+ }
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
+ }
}