You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/05 12:07:05 UTC

[iotdb] branch rel/0.11 updated: fix compaction bug after write data by deserialize merge (#2195)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 51dc09d  fix compaction bug after write data by deserialize merge (#2195)
51dc09d is described below

commit 51dc09d268426e0a339bcde6066f2f6b6a5adb2d
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sat Dec 5 20:06:46 2020 +0800

    fix compaction bug after write data by deserialize merge (#2195)
---
 .../engine/compaction/utils/CompactionUtils.java   | 12 ++++-
 .../iotdb/db/integration/IoTDBCompactionIT.java    | 56 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index d44234f..80c31cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -36,6 +36,9 @@ import java.util.TreeMap;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -141,8 +144,13 @@ public class CompactionUtils {
     if (chunkMetadataList.isEmpty()) {
       return maxVersion;
     }
-    IChunkWriter chunkWriter = new ChunkWriterImpl(
-        new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType()));
+    IChunkWriter chunkWriter;
+    try {
+      chunkWriter = new ChunkWriterImpl(
+          IoTDB.metaManager.getSeriesSchema(new PartialPath(device), entry.getKey()));
+    } catch (MetadataException e) {
+      throw new IOException(e);
+    }
     for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
       writeTVPair(timeValuePair, chunkWriter);
       targetResource.updateStartTime(device, timeValuePair.getTimestamp());
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
index d3cc788..e9779da 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
@@ -95,5 +95,61 @@ public class IoTDBCompactionIT {
       assertEquals(32, cnt);
     }
   }
+  
+  @Test
+  public void testAppendMergeAfterDeserializeMerge() throws SQLException {
+    boolean prevEnableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig()
+        .isEnableUnseqCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.compactionTest");
+      try {
+        statement.execute("CREATE TIMESERIES root.compactionTest.s1 WITH DATATYPE=INT64");
+      } catch (SQLException e) {
+        // ignore
+      }
+
+      long pageSize = 100;
+      long timestamp = 1;
+
+      for (long row = 0; row < 10000; row++) {
+        statement
+            .execute(
+                String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)",
+                    timestamp, 1));
+        if (row % pageSize == 0) {
+          statement.execute("FLUSH");
+        }
+        timestamp++;
+      }
+
+      timestamp = 8322;
 
+      for (long row = 0; row < 2400; row++) {
+        statement
+            .execute(
+                String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)",
+                    timestamp, 1));
+        if (row % pageSize == 0) {
+          statement.execute("FLUSH");
+        }
+        timestamp++;
+      }
+
+      int cnt;
+      try (ResultSet resultSet = statement
+          .executeQuery("SELECT COUNT(s1) FROM root.compactionTest")) {
+        cnt = 0;
+        while (resultSet.next()) {
+          System.out.println(resultSet.getLong(1));
+          assertEquals(10721, resultSet.getLong(1));
+          cnt++;
+        }
+      }
+      assertEquals(1, cnt);
+    }
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
+  }
 }