You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 05:45:01 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 35fc9c68c9 [To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
35fc9c68c9 is described below
commit 35fc9c68c92f808459627074bcf6450fa50804cf
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Apr 28 13:44:55 2023 +0800
[To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
---
.../db/it/last/IoTDBLastQueryLastCache2IT.java | 60 +++++++++
.../db/it/last/IoTDBLastQueryLastCacheIT.java | 145 +++++++++++++++++++++
.../org/apache/iotdb/commons/path/AlignedPath.java | 6 +
.../apache/iotdb/db/metadata/utils/MetaUtils.java | 11 +-
.../execution/exchange/sink/LocalSinkChannel.java | 12 +-
.../mpp/execution/exchange/sink/SinkChannel.java | 15 ++-
.../plan/planner/distribution/SourceRewriter.java | 11 ++
7 files changed, 254 insertions(+), 6 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java
new file mode 100644
index 0000000000..ecf2742775
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.last;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLastQueryLastCache2IT extends IoTDBLastQueryLastCacheIT {
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // with lastCache
+ EnvFactory.getEnv().getConfig().getCommonConfig().setEnableLastCache(true);
+ EnvFactory.getEnv().initClusterEnvironment();
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ prepareData(SQLs);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
new file mode 100644
index 0000000000..40e8adf8a0
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.last;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.DATA_TYPE_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESEIRES_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.VALUE_STR;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLastQueryLastCacheIT {
+ protected static final String[] SQLs =
+ new String[] {
+ "create aligned timeseries root.ln_1.tb_6141(machineExit_BOOLEAN BOOLEAN encoding=RLE,`waterNH4-N_DOUBLE` DOUBLE encoding=GORILLA,status_BOOLEAN BOOLEAN encoding=RLE,11_TEXT TEXT encoding=PLAIN,waterInterval_DOUBLE DOUBLE encoding=GORILLA,content_TEXT TEXT encoding=PLAIN, machineOn_BOOLEAN BOOLEAN encoding=RLE,enum_INT32 INT32 encoding=RLE,waterTP_DOUBLE DOUBLE encoding=GORILLA,fluidVelocity_DOUBLE DOUBLE encoding=GORILLA,CO2_DOUBLE DOUBLE encoding=GORILLA,`switch_BOOLEAN` BOOLEA [...]
+ "alter timeseries root.ln_1.tb_6141.machineExit_BOOLEAN upsert alias=machineExit;",
+ "alter timeseries root.ln_1.tb_6141.fluidVelocity_DOUBLE upsert alias=fluidVelocity;",
+ "alter timeseries root.ln_1.tb_6141.CO2_DOUBLE upsert alias=CO2;",
+ "alter timeseries root.ln_1.tb_6141.machineOn_BOOLEAN upsert alias=machineOn;",
+ "alter timeseries root.ln_1.tb_6141.waterInterval_DOUBLE upsert alias=waterInterval;",
+ "alter timeseries root.ln_1.tb_6141.status_BOOLEAN upsert alias=status;",
+ "alter timeseries root.ln_1.tb_6141.enum_INT32 upsert alias=enum;",
+ "alter timeseries root.ln_1.tb_6141.waterTP_DOUBLE upsert alias=waterTP;",
+ "alter timeseries root.ln_1.tb_6141.content_TEXT upsert alias=content;",
+ "alter timeseries root.ln_1.tb_6141.`waterNH4-N_DOUBLE` upsert alias=`waterNH4-N`;",
+ "alter timeseries root.ln_1.tb_6141.code_DOUBLE upsert alias=code;",
+ "alter timeseries root.ln_1.tb_6141.11_TEXT upsert alias=`11`;",
+ "alter timeseries root.ln_1.tb_6141.`switch_BOOLEAN` upsert alias=`switch`;",
+ "insert into root.ln_1.tb_6141(time,waterInterval_DOUBLE) aligned values(1679365910000,10.0);",
+ "insert into root.ln_1.tb_6141(time,waterTP_DOUBLE) aligned values(1679365910000,15.0);",
+ "insert into root.ln_1.tb_6141(time,code_DOUBLE) aligned values(1679477545000,2.0);",
+ "insert into root.ln_1.tb_6141(time,content_TEXT) aligned values(1675995566000,52);",
+ "insert into root.ln_1.tb_6141(time,enum_INT32) aligned values(1675995566000,2);",
+ "insert into root.ln_1.tb_6141(time,fluidVelocity_DOUBLE) aligned values(1679365910000,15.0);",
+ "insert into root.ln_1.tb_6141(time,status_BOOLEAN) aligned values(1677033625000,true);",
+ "insert into root.ln_1.tb_6141(time,machineOn_BOOLEAN) aligned values(1675995566000,true);",
+ "insert into root.ln_1.tb_6141(time,machineExit_BOOLEAN) aligned values(1675995566000,false);",
+ "insert into root.ln_1.tb_6141(time,11_TEXT) aligned values(1679365910000,13);",
+ "insert into root.ln_1.tb_6141(time,CO2_DOUBLE) aligned values(1679365910000,12.0);",
+ "insert into root.ln_1.tb_6141(time,`waterNH4-N_DOUBLE`) aligned values(1679365910000,12.0);",
+ "insert into root.ln_1.tb_6141(time,`waterNH4-N_DOUBLE`) aligned values(1679365910000,12.0);",
+ "insert into root.ln_1.tb_6141(time,`switch_BOOLEAN`) aligned values(1675995566000,false);"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // without lastCache
+ EnvFactory.getEnv().getConfig().getCommonConfig().setEnableLastCache(false);
+ EnvFactory.getEnv().initClusterEnvironment();
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ prepareData(SQLs);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testLastQuery() {
+ String[] expectedHeader =
+ new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
+ String[] retArray =
+ new String[] {
+ "1679365910000,root.ln_1.tb_6141.11_TEXT,13,TEXT,",
+ "1679365910000,root.ln_1.tb_6141.CO2_DOUBLE,12.0,DOUBLE,",
+ "1679365910000,root.ln_1.tb_6141.`waterNH4-N_DOUBLE`,12.0,DOUBLE,",
+ "1679477545000,root.ln_1.tb_6141.code_DOUBLE,2.0,DOUBLE,",
+ "1675995566000,root.ln_1.tb_6141.content_TEXT,52,TEXT,",
+ "1675995566000,root.ln_1.tb_6141.enum_INT32,2,INT32,",
+ "1679365910000,root.ln_1.tb_6141.fluidVelocity_DOUBLE,15.0,DOUBLE,",
+ "1675995566000,root.ln_1.tb_6141.machineExit_BOOLEAN,false,BOOLEAN,",
+ "1675995566000,root.ln_1.tb_6141.machineOn_BOOLEAN,true,BOOLEAN,",
+ "1677033625000,root.ln_1.tb_6141.status_BOOLEAN,true,BOOLEAN,",
+ "1675995566000,root.ln_1.tb_6141.switch_BOOLEAN,false,BOOLEAN,",
+ "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
+ "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
+ };
+ resultSetEqualTest("select last * from root.ln_1.tb_6141;", expectedHeader, retArray);
+ }
+
+ @Test
+ public void testLastQueryOrderByTimeDesc() {
+ String[] expectedHeader =
+ new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
+ String[] retArray =
+ new String[] {
+ "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
+ "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
+ "1675995566000,root.ln_1.tb_6141.switch_BOOLEAN,false,BOOLEAN,",
+ "1677033625000,root.ln_1.tb_6141.status_BOOLEAN,true,BOOLEAN,",
+ "1675995566000,root.ln_1.tb_6141.machineOn_BOOLEAN,true,BOOLEAN,",
+ "1675995566000,root.ln_1.tb_6141.machineExit_BOOLEAN,false,BOOLEAN,",
+ "1679365910000,root.ln_1.tb_6141.fluidVelocity_DOUBLE,15.0,DOUBLE,",
+ "1675995566000,root.ln_1.tb_6141.enum_INT32,2,INT32,",
+ "1675995566000,root.ln_1.tb_6141.content_TEXT,52,TEXT,",
+ "1679477545000,root.ln_1.tb_6141.code_DOUBLE,2.0,DOUBLE,",
+ "1679365910000,root.ln_1.tb_6141.`waterNH4-N_DOUBLE`,12.0,DOUBLE,",
+ "1679365910000,root.ln_1.tb_6141.CO2_DOUBLE,12.0,DOUBLE,",
+ "1679365910000,root.ln_1.tb_6141.11_TEXT,13,TEXT,",
+ };
+ resultSetEqualTest(
+ "select last * from root.ln_1.tb_6141 order by timeseries desc;", expectedHeader, retArray);
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index affcf4b70c..63c7dd3745 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -37,6 +37,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -112,6 +113,11 @@ public class AlignedPath extends PartialPath {
schemaList = new ArrayList<>();
}
+ public void sortMeasurement(Comparator<String> comparator) {
+ measurementList.sort(comparator);
+ schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId, comparator));
+ }
+
@Override
public PartialPath getDevicePath() {
return new PartialPath(Arrays.copyOf(nodes, nodes.length));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index cf0141cbef..f6442f2ec6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -118,11 +118,20 @@ public class MetaUtils {
public static List<PartialPath> groupAlignedSeriesWithOrder(
List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
- List<PartialPath> res = groupAlignedSeries(fullPaths, new HashMap<>());
+ Map<String, AlignedPath> deviceToAlignedPathMap = new HashMap<>();
+ List<PartialPath> res = groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
res.sort(
orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
? Comparator.naturalOrder()
: Comparator.reverseOrder());
+ // sort the measurements of AlignedPath
+ Comparator<String> comparator =
+ orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
+ ? Comparator.naturalOrder()
+ : Comparator.reverseOrder();
+ for (AlignedPath alignedPath : deviceToAlignedPathMap.values()) {
+ alignedPath.sortMeasurement(comparator);
+ }
return res;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index d24c669c5d..aae1f8f37c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -48,6 +48,8 @@ public class LocalSinkChannel implements ISinkChannel {
private boolean aborted = false;
private boolean closed = false;
+ private boolean invokedOnFinished = false;
+
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
public LocalSinkChannel(SharedTsBlockQueue queue, SinkListener sinkListener) {
@@ -106,7 +108,10 @@ public class LocalSinkChannel implements ISinkChannel {
synchronized (queue) {
if (isFinished()) {
synchronized (this) {
- sinkListener.onFinish(this);
+ if (!invokedOnFinished) {
+ sinkListener.onFinish(this);
+ invokedOnFinished = true;
+ }
}
}
}
@@ -188,7 +193,10 @@ public class LocalSinkChannel implements ISinkChannel {
}
closed = true;
queue.close();
- sinkListener.onFinish(this);
+ if (!invokedOnFinished) {
+ sinkListener.onFinish(this);
+ invokedOnFinished = true;
+ }
}
}
LOGGER.debug("[EndCloseLocalSinkChannel]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 07587ea43b..b7d9309cf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -49,6 +49,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
@@ -103,6 +104,8 @@ public class SinkChannel implements ISinkChannel {
private boolean noMoreTsBlocks = false;
+ private final AtomicBoolean invokedOnFinished = new AtomicBoolean(false);
+
/** max bytes this SinkChannel can reserve. */
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
@@ -263,11 +266,17 @@ public class SinkChannel implements ISinkChannel {
.getQueryPool()
.clearMemoryReservationMap(
localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
- sinkListener.onFinish(this);
+ invokeOnFinished();
closed = true;
LOGGER.debug("[EndCloseSinkChannel]");
}
+ private void invokeOnFinished() {
+ if (invokedOnFinished.compareAndSet(false, true)) {
+ sinkListener.onFinish(this);
+ }
+ }
+
@Override
public boolean isClosed() {
return closed;
@@ -347,7 +356,7 @@ public class SinkChannel implements ISinkChannel {
}
}
if (isFinished()) {
- sinkListener.onFinish(this);
+ invokeOnFinished();
}
}
@@ -523,7 +532,7 @@ public class SinkChannel implements ISinkChannel {
}
noMoreTsBlocks = true;
if (isFinished()) {
- sinkListener.onFinish(SinkChannel.this);
+ invokeOnFinished();
}
sinkListener.onEndOfBlocks(SinkChannel.this);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 22efafdb10..16771c6ad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -567,6 +567,17 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return fullPath;
}))
.collect(Collectors.toList()));
+ lastQueryNode
+ .getChildren()
+ .forEach(
+ child -> {
+ if (child instanceof AlignedLastQueryScanNode) {
+ // sort the measurements of AlignedPath for LastQueryMergeOperator
+ ((AlignedLastQueryScanNode) child)
+ .getSeriesPath()
+ .sortMeasurement(Comparator.naturalOrder());
+ }
+ });
} else {
for (PlanNode child : root.getChildren()) {
addSortForEachLastQueryNode(child, orderByParameter);