You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/05 09:52:55 UTC

[iotdb] branch master updated: [ISSUE-2187] Fix compaction encoding bug after deserialization merge (#2188)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e652563  [ISSUE-2187] Fix compaction encoding bug after deserialization merge (#2188)
e652563 is described below

commit e6525637c9956133301408a9db9b45733f6ac23e
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sat Dec 5 17:52:35 2020 +0800

    [ISSUE-2187] Fix compaction encoding bug after deserialization merge (#2188)
    
    * fix compaction bug after write data by deserialize merge
---
 .../engine/compaction/utils/CompactionUtils.java   | 21 ++++++---
 .../iotdb/db/integration/IoTDBCompactionIT.java    | 55 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 9e34b61..0f76436 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -36,6 +36,9 @@ import java.util.TreeMap;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -47,7 +50,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,8 +143,13 @@ public class CompactionUtils {
     if (chunkMetadataList.isEmpty()) {
       return maxVersion;
     }
-    IChunkWriter chunkWriter = new ChunkWriterImpl(
-        new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType()), true);
+    IChunkWriter chunkWriter;
+    try {
+      chunkWriter = new ChunkWriterImpl(
+          IoTDB.metaManager.getSeriesSchema(new PartialPath(device), entry.getKey()), true);
+    } catch (MetadataException e) {
+      throw new IOException(e);
+    }
     for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
       writeTVPair(timeValuePair, chunkWriter);
       targetResource.updateStartTime(device, timeValuePair.getTimestamp());
@@ -172,11 +179,11 @@ public class CompactionUtils {
   }
 
   /**
-   * @param targetResource the target resource to be merged to
-   * @param tsFileResources the source resource to be merged
-   * @param storageGroup the storage group name
+   * @param targetResource   the target resource to be merged to
+   * @param tsFileResources  the source resource to be merged
+   * @param storageGroup     the storage group name
    * @param compactionLogger the logger
-   * @param devices the devices to be skipped(used by recover)
+   * @param devices          the devices to be skipped(used by recover)
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public static void merge(TsFileResource targetResource,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
index d3cc788..17265ce 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
@@ -25,6 +25,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Random;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -96,4 +97,58 @@ public class IoTDBCompactionIT {
     }
   }
 
+  @Test
+  public void testAppendMergeAfterDeserializeMerge() throws SQLException {
+    boolean prevEnableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.compactionTest");
+      try {
+        statement.execute("CREATE TIMESERIES root.compactionTest.s1 WITH DATATYPE=INT64");
+      } catch (SQLException e) {
+        // ignore
+      }
+
+      long pageSize=100;
+      long timestamp = 1;
+
+      for (long row = 0; row < 10000; row++) {
+        statement
+            .execute(
+                String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)", timestamp, 1));
+        if (row % pageSize == 0) {
+          statement.execute("FLUSH");
+        }
+        timestamp++;
+      }
+
+      timestamp = 8322;
+
+      for (long row = 0; row < 2400; row++) {
+        statement
+            .execute(
+                String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)", timestamp, 1));
+        if (row % pageSize == 0) {
+          statement.execute("FLUSH");
+        }
+        timestamp++;
+      }
+
+      int cnt;
+      try (ResultSet resultSet = statement.executeQuery("SELECT COUNT(s1) FROM root.compactionTest")) {
+        cnt = 0;
+        while (resultSet.next()) {
+          System.out.println(resultSet.getLong(1));
+          assertEquals(10721, resultSet.getLong(1));
+          cnt++;
+        }
+      }
+      assertEquals(1, cnt);
+    }
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
+  }
+
+
 }