You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/01/15 02:43:26 UTC

[incubator-iotdb] branch fix_421 created (now 47ff293)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch fix_421
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 47ff293  fix that selected seq files for merge are not sorted

This branch includes the following new commits:

     new 47ff293  fix that selected seq files for merge are not sorted

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix that selected seq files for merge are not sorted

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_421
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 47ff293a6dc286cca3d6fcb1c165d42f7cfd0d99
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Jan 15 10:42:52 2020 +0800

    fix that selected seq files for merge are not sorted
---
 .../merge/selector/MaxFileMergeFileSelector.java   |   8 +-
 .../iotdb/db/integration/IoTDBMergeTest.java       | 134 ++++++++++++++++++++-
 2 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
index d3451fd..3e87d89 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
@@ -65,6 +65,7 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
   private Map<TsFileResource, Long> maxSeriesQueryCostMap = new HashMap<>();
 
   List<TsFileResource> selectedUnseqFiles;
+  List<Integer> selectedSeqFileIndices;
   List<TsFileResource> selectedSeqFiles;
 
   private Collection<Integer> tmpSelectedSeqFiles;
@@ -134,6 +135,7 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
 
   void select(boolean useTightBound) throws IOException {
     tmpSelectedSeqFiles = new HashSet<>();
+    selectedSeqFileIndices = new ArrayList<>();
     seqSelected = new boolean[resource.getSeqFiles().size()];
     seqSelectedNum = 0;
     selectedSeqFiles = new ArrayList<>();
@@ -186,7 +188,7 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
         for (Integer seqIdx : tmpSelectedSeqFiles) {
           seqSelected[seqIdx] = true;
           seqSelectedNum++;
-          selectedSeqFiles.add(resource.getSeqFiles().get(seqIdx));
+          selectedSeqFileIndices.add(seqIdx);
         }
         totalCost += newCost;
         logger.debug("Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
@@ -197,6 +199,10 @@ public class MaxFileMergeFileSelector implements IMergeFileSelector {
       unseqIndex++;
       timeConsumption = System.currentTimeMillis() - startTime;
     }
+    selectedSeqFileIndices.sort(null);
+    for (Integer i : selectedSeqFileIndices) {
+      selectedSeqFiles.add(resource.getSeqFiles().get(i));
+    }
   }
 
   private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index e402fd8..336f455 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -20,12 +20,16 @@
 package org.apache.iotdb.db.integration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
@@ -36,18 +40,21 @@ import org.slf4j.LoggerFactory;
 
 public class IoTDBMergeTest {
   private static final Logger logger = LoggerFactory.getLogger(IoTDBMergeTest.class);
-
+  private long prevPartitionInterval;
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
 
     EnvironmentUtils.envSetUp();
+    prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1);
     Class.forName(Config.JDBC_DRIVER_NAME);
   }
 
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
   }
 
   @Test
@@ -97,4 +104,129 @@ public class IoTDBMergeTest {
       }
     }
   }
+
+  @Test
+  public void testInvertedOrder() {
+    // case: seq data and unseq data are written in reverted order
+    // e.g.: write 1. seq [10, 20), 2. seq [20, 30), 3. unseq [20, 30), 4. unseq [10, 20)
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.mergeTest");
+      for (int i = 1; i <= 3; i++) {
+        try {
+          statement.execute("CREATE TIMESERIES root.mergeTest.s" + i + " WITH DATATYPE=INT64,"
+              + "ENCODING=PLAIN");
+        } catch (SQLException e) {
+          // ignore
+        }
+      }
+
+      for (int j = 10; j < 20; j++) {
+        statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+1, j+2, j+3));
+      }
+      statement.execute("FLUSH");
+      for (int j = 20; j < 30; j++) {
+        statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+1, j+2, j+3));
+      }
+      statement.execute("FLUSH");
+
+      for (int j = 20; j < 30; j++) {
+        statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+10, j+20, j+30));
+      }
+      statement.execute("FLUSH");
+      for (int j = 10; j < 20; j++) {
+        statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+10, j+20, j+30));
+      }
+      statement.execute("FLUSH");
+
+      statement.execute("MERGE");
+
+      int cnt;
+      try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+        cnt = 0;
+        while (resultSet.next()) {
+          long time = resultSet.getLong("Time");
+          long s1 = resultSet.getLong("root.mergeTest.s1");
+          long s2 = resultSet.getLong("root.mergeTest.s2");
+          long s3 = resultSet.getLong("root.mergeTest.s3");
+          assertEquals(cnt + 10, time);
+          assertEquals(time + 10, s1);
+          assertEquals(time + 20, s2);
+          assertEquals(time + 30, s3);
+          cnt++;
+        }
+      }
+      assertEquals(20, cnt);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCrossPartition() throws SQLException, StorageEngineException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.mergeTest");
+      for (int i = 1; i <= 3; i++) {
+        try {
+          statement.execute("CREATE TIMESERIES root.mergeTest.s" + i + " WITH DATATYPE=INT64,"
+              + "ENCODING=PLAIN");
+        } catch (SQLException e) {
+          // ignore
+        }
+      }
+
+      // file in partition
+      for (int k = 0; k < 7; k++) {
+        // partition num
+        for (int i = 0; i < 10; i++) {
+          // sequence files
+          for (int j = i * 1000 + 300 + k * 100; j <= i * 1000 + 399 + k * 100; j++) {
+            statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+                + "%d,%d)", j, j+1, j+2, j+3));
+          }
+          statement.execute("FLUSH");
+          // unsequence files
+          for (int j = i * 1000 + k * 100; j <= i * 1000 + 99 + k * 100; j++) {
+            statement.execute(String.format("INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+                + "%d,%d)", j, j+10, j+20, j+30));
+          }
+          statement.execute("FLUSH");
+        }
+      }
+
+      //statement.execute("MERGE");
+      StorageEngine.getInstance().mergeAll(false);
+
+      int cnt;
+      try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+        cnt = 0;
+        while (resultSet.next()) {
+          long time = resultSet.getLong("Time");
+          long s1 = resultSet.getLong("root.mergeTest.s1");
+          long s2 = resultSet.getLong("root.mergeTest.s2");
+          long s3 = resultSet.getLong("root.mergeTest.s3");
+          assertEquals(cnt, time);
+          if (time % 1000 < 700) {
+            assertEquals(time + 10, s1);
+            assertEquals(time + 20, s2);
+            assertEquals(time + 30, s3);
+          } else {
+            assertEquals(time + 1, s1);
+            assertEquals(time + 2, s2);
+            assertEquals(time + 3, s3);
+          }
+          cnt++;
+        }
+      }
+      assertEquals(10000, cnt);
+    }
+  }
 }