You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/23 11:46:08 UTC

[iotdb] 01/01: [IOTDB-2773] fix overlapped data should be consumed first bug

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-2773-013
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e8da65d4d2b5df646cf236cc416811556ce7a962
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 23 19:44:58 2022 +0800

    [IOTDB-2773] fix overlapped data should be consumed first bug
---
 .../iotdb/db/integration/IoTDBMaxTimeQueryIT.java  | 134 +++++++++++++++++++++
 .../iotdb/db/query/reader/series/SeriesReader.java |  56 +++++----
 2 files changed, 161 insertions(+), 29 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBMaxTimeQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBMaxTimeQueryIT.java
new file mode 100644
index 0000000..a881d98
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBMaxTimeQueryIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
+public class IoTDBMaxTimeQueryIT {
+
+  private static int numOfPointsPerPage;
+  private static boolean enableSeqSpaceCompaction;
+  private static boolean enableUnseqSpaceCompaction;
+  private static boolean enableCrossSpaceCompaction;
+
+  private static final String[] sqls =
+      new String[] {
+        "insert into root.sg1.d1(time, s1) values(1, 1.0)",
+        "insert into root.sg1.d1(time, s1) values(2, 2.0)",
+        "insert into root.sg1.d1(time, s1) values(6, 2.0)",
+        "insert into root.sg1.d1(time, s1) values(7, 2.0)",
+        "flush",
+        "insert into root.sg1.d1(time, s1) values(8, 8.0)",
+        "insert into root.sg1.d1(time, s1) values(9, 9.0)",
+        "insert into root.sg1.d1(time, s1) values(10, 10.0)",
+        "insert into root.sg1.d1(time, s1) values(11, 11.0)",
+        "flush",
+        "insert into root.sg1.d1(time, s1) values(13, 13.0)",
+        "flush",
+        "insert into root.sg1.d1(time, s1) values(4, 4.0)",
+        "insert into root.sg1.d1(time, s1) values(12, 12.0)",
+        "flush",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    // TODO When the aligned time series support compaction, we need to set compaction to true
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(2);
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void maxTimeTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"12"};
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute("select max_time(s1) from root.sg1.d1 where time <= 12");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(retArray[cnt], resultSet.getString("max_time(root.sg1.d1.s1)"));
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void insertData() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      // create aligned and non-aligned time series
+      for (String sql : sqls) {
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 615c685..ab5fbb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -32,10 +32,7 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReade
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -594,35 +591,36 @@ public class SeriesReader {
       addTotalPageNumInTracing(context.getQueryId(), pageReaderList.size());
     }
 
-    pageReaderList.forEach(
-        pageReader -> {
-          if (chunkMetaData.isSeq()) {
-            // addLast for asc; addFirst for desc
-            if (orderUtils.getAscending()) {
-              seqPageReaders.add(
-                  new VersionPageReader(
-                      chunkMetaData.getVersion(),
-                      chunkMetaData.getOffsetOfChunkHeader(),
-                      pageReader,
-                      true));
-            } else {
-              seqPageReaders.add(
-                  0,
+    if (chunkMetaData.isSeq()) {
+      if (orderUtils.getAscending()) {
+        for (IPageReader iPageReader : pageReaderList) {
+          seqPageReaders.add(
+              new VersionPageReader(
+                  chunkMetaData.getVersion(),
+                  chunkMetaData.getOffsetOfChunkHeader(),
+                  iPageReader,
+                  true));
+        }
+      } else {
+        for (int i = pageReaderList.size() - 1; i >= 0; i--) {
+          seqPageReaders.add(
+              new VersionPageReader(
+                  chunkMetaData.getVersion(),
+                  chunkMetaData.getOffsetOfChunkHeader(),
+                  pageReaderList.get(i),
+                  true));
+        }
+      }
+    } else {
+      pageReaderList.forEach(
+          pageReader ->
+              unSeqPageReaders.add(
                   new VersionPageReader(
                       chunkMetaData.getVersion(),
                       chunkMetaData.getOffsetOfChunkHeader(),
                       pageReader,
-                      true));
-            }
-          } else {
-            unSeqPageReaders.add(
-                new VersionPageReader(
-                    chunkMetaData.getVersion(),
-                    chunkMetaData.getOffsetOfChunkHeader(),
-                    pageReader,
-                    false));
-          }
-        });
+                      false)));
+    }
   }
 
   private void addTotalPageNumInTracing(long queryId, int pageNum) {