You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/23 11:46:08 UTC
[iotdb] 01/01: [IOTDB-2773] fix overlapped data should be consumed first bug
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-2773-013
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e8da65d4d2b5df646cf236cc416811556ce7a962
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 23 19:44:58 2022 +0800
[IOTDB-2773] fix overlapped data should be consumed first bug
---
.../iotdb/db/integration/IoTDBMaxTimeQueryIT.java | 134 +++++++++++++++++++++
.../iotdb/db/query/reader/series/SeriesReader.java | 56 +++++----
2 files changed, 161 insertions(+), 29 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBMaxTimeQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBMaxTimeQueryIT.java
new file mode 100644
index 0000000..a881d98
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBMaxTimeQueryIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.*;
+
+import static org.junit.Assert.*;
+
+public class IoTDBMaxTimeQueryIT {
+
+ private static int numOfPointsPerPage;
+ private static boolean enableSeqSpaceCompaction;
+ private static boolean enableUnseqSpaceCompaction;
+ private static boolean enableCrossSpaceCompaction;
+
+ private static final String[] sqls =
+ new String[] {
+ "insert into root.sg1.d1(time, s1) values(1, 1.0)",
+ "insert into root.sg1.d1(time, s1) values(2, 2.0)",
+ "insert into root.sg1.d1(time, s1) values(6, 2.0)",
+ "insert into root.sg1.d1(time, s1) values(7, 2.0)",
+ "flush",
+ "insert into root.sg1.d1(time, s1) values(8, 8.0)",
+ "insert into root.sg1.d1(time, s1) values(9, 9.0)",
+ "insert into root.sg1.d1(time, s1) values(10, 10.0)",
+ "insert into root.sg1.d1(time, s1) values(11, 11.0)",
+ "flush",
+ "insert into root.sg1.d1(time, s1) values(13, 13.0)",
+ "flush",
+ "insert into root.sg1.d1(time, s1) values(4, 4.0)",
+ "insert into root.sg1.d1(time, s1) values(12, 12.0)",
+ "flush",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ // TODO When the aligned time series support compaction, we need to set compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(2);
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void maxTimeTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"12"};
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute("select max_time(s1) from root.sg1.d1 where time <= 12");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ assertEquals(retArray[cnt], resultSet.getString("max_time(root.sg1.d1.s1)"));
+ cnt++;
+ }
+ assertEquals(retArray.length, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void insertData() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ // create aligned and non-aligned time series
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 615c685..ab5fbb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -32,10 +32,7 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReade
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.*;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -594,35 +591,36 @@ public class SeriesReader {
addTotalPageNumInTracing(context.getQueryId(), pageReaderList.size());
}
- pageReaderList.forEach(
- pageReader -> {
- if (chunkMetaData.isSeq()) {
- // addLast for asc; addFirst for desc
- if (orderUtils.getAscending()) {
- seqPageReaders.add(
- new VersionPageReader(
- chunkMetaData.getVersion(),
- chunkMetaData.getOffsetOfChunkHeader(),
- pageReader,
- true));
- } else {
- seqPageReaders.add(
- 0,
+ if (chunkMetaData.isSeq()) {
+ if (orderUtils.getAscending()) {
+ for (IPageReader iPageReader : pageReaderList) {
+ seqPageReaders.add(
+ new VersionPageReader(
+ chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(),
+ iPageReader,
+ true));
+ }
+ } else {
+ for (int i = pageReaderList.size() - 1; i >= 0; i--) {
+ seqPageReaders.add(
+ new VersionPageReader(
+ chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(),
+ pageReaderList.get(i),
+ true));
+ }
+ }
+ } else {
+ pageReaderList.forEach(
+ pageReader ->
+ unSeqPageReaders.add(
new VersionPageReader(
chunkMetaData.getVersion(),
chunkMetaData.getOffsetOfChunkHeader(),
pageReader,
- true));
- }
- } else {
- unSeqPageReaders.add(
- new VersionPageReader(
- chunkMetaData.getVersion(),
- chunkMetaData.getOffsetOfChunkHeader(),
- pageReader,
- false));
- }
- });
+ false)));
+ }
}
private void addTotalPageNumInTracing(long queryId, int pageNum) {