You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 05:45:01 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 35fc9c68c9 [To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
35fc9c68c9 is described below

commit 35fc9c68c92f808459627074bcf6450fa50804cf
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Apr 28 13:44:55 2023 +0800

    [To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
---
 .../db/it/last/IoTDBLastQueryLastCache2IT.java     |  60 +++++++++
 .../db/it/last/IoTDBLastQueryLastCacheIT.java      | 145 +++++++++++++++++++++
 .../org/apache/iotdb/commons/path/AlignedPath.java |   6 +
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |  11 +-
 .../execution/exchange/sink/LocalSinkChannel.java  |  12 +-
 .../mpp/execution/exchange/sink/SinkChannel.java   |  15 ++-
 .../plan/planner/distribution/SourceRewriter.java  |  11 ++
 7 files changed, 254 insertions(+), 6 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java
new file mode 100644
index 0000000000..ecf2742775
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCache2IT.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.last;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLastQueryLastCache2IT extends IoTDBLastQueryLastCacheIT {
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // with lastCache
+    EnvFactory.getEnv().getConfig().getCommonConfig().setEnableLastCache(true);
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      prepareData(SQLs);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
new file mode 100644
index 0000000000..40e8adf8a0
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.last;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.DATA_TYPE_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESEIRES_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.iotdb.itbase.constant.TestConstant.VALUE_STR;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBLastQueryLastCacheIT {
+  protected static final String[] SQLs =
+      new String[] {
+        "create aligned timeseries root.ln_1.tb_6141(machineExit_BOOLEAN BOOLEAN encoding=RLE,`waterNH4-N_DOUBLE` DOUBLE encoding=GORILLA,status_BOOLEAN BOOLEAN encoding=RLE,11_TEXT TEXT encoding=PLAIN,waterInterval_DOUBLE DOUBLE encoding=GORILLA,content_TEXT TEXT encoding=PLAIN, machineOn_BOOLEAN BOOLEAN encoding=RLE,enum_INT32 INT32 encoding=RLE,waterTP_DOUBLE DOUBLE encoding=GORILLA,fluidVelocity_DOUBLE DOUBLE encoding=GORILLA,CO2_DOUBLE DOUBLE encoding=GORILLA,`switch_BOOLEAN` BOOLEA [...]
+        "alter timeseries root.ln_1.tb_6141.machineExit_BOOLEAN upsert alias=machineExit;",
+        "alter timeseries root.ln_1.tb_6141.fluidVelocity_DOUBLE upsert alias=fluidVelocity;",
+        "alter timeseries root.ln_1.tb_6141.CO2_DOUBLE upsert alias=CO2;",
+        "alter timeseries root.ln_1.tb_6141.machineOn_BOOLEAN upsert alias=machineOn;",
+        "alter timeseries root.ln_1.tb_6141.waterInterval_DOUBLE upsert alias=waterInterval;",
+        "alter timeseries root.ln_1.tb_6141.status_BOOLEAN upsert alias=status;",
+        "alter timeseries root.ln_1.tb_6141.enum_INT32 upsert alias=enum;",
+        "alter timeseries root.ln_1.tb_6141.waterTP_DOUBLE upsert alias=waterTP;",
+        "alter timeseries root.ln_1.tb_6141.content_TEXT upsert alias=content;",
+        "alter timeseries root.ln_1.tb_6141.`waterNH4-N_DOUBLE` upsert alias=`waterNH4-N`;",
+        "alter timeseries root.ln_1.tb_6141.code_DOUBLE upsert alias=code;",
+        "alter timeseries root.ln_1.tb_6141.11_TEXT upsert alias=`11`;",
+        "alter timeseries root.ln_1.tb_6141.`switch_BOOLEAN` upsert alias=`switch`;",
+        "insert into root.ln_1.tb_6141(time,waterInterval_DOUBLE) aligned values(1679365910000,10.0);",
+        "insert into root.ln_1.tb_6141(time,waterTP_DOUBLE) aligned values(1679365910000,15.0);",
+        "insert into root.ln_1.tb_6141(time,code_DOUBLE) aligned values(1679477545000,2.0);",
+        "insert into root.ln_1.tb_6141(time,content_TEXT) aligned values(1675995566000,52);",
+        "insert into root.ln_1.tb_6141(time,enum_INT32) aligned values(1675995566000,2);",
+        "insert into root.ln_1.tb_6141(time,fluidVelocity_DOUBLE) aligned values(1679365910000,15.0);",
+        "insert into root.ln_1.tb_6141(time,status_BOOLEAN) aligned values(1677033625000,true);",
+        "insert into root.ln_1.tb_6141(time,machineOn_BOOLEAN) aligned values(1675995566000,true);",
+        "insert into root.ln_1.tb_6141(time,machineExit_BOOLEAN) aligned values(1675995566000,false);",
+        "insert into root.ln_1.tb_6141(time,11_TEXT) aligned values(1679365910000,13);",
+        "insert into root.ln_1.tb_6141(time,CO2_DOUBLE) aligned values(1679365910000,12.0);",
+        "insert into root.ln_1.tb_6141(time,`waterNH4-N_DOUBLE`) aligned values(1679365910000,12.0);",
+        "insert into root.ln_1.tb_6141(time,`waterNH4-N_DOUBLE`) aligned values(1679365910000,12.0);",
+        "insert into root.ln_1.tb_6141(time,`switch_BOOLEAN`) aligned values(1675995566000,false);"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // without lastCache
+    EnvFactory.getEnv().getConfig().getCommonConfig().setEnableLastCache(false);
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      prepareData(SQLs);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testLastQuery() {
+    String[] expectedHeader =
+        new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
+    String[] retArray =
+        new String[] {
+          "1679365910000,root.ln_1.tb_6141.11_TEXT,13,TEXT,",
+          "1679365910000,root.ln_1.tb_6141.CO2_DOUBLE,12.0,DOUBLE,",
+          "1679365910000,root.ln_1.tb_6141.`waterNH4-N_DOUBLE`,12.0,DOUBLE,",
+          "1679477545000,root.ln_1.tb_6141.code_DOUBLE,2.0,DOUBLE,",
+          "1675995566000,root.ln_1.tb_6141.content_TEXT,52,TEXT,",
+          "1675995566000,root.ln_1.tb_6141.enum_INT32,2,INT32,",
+          "1679365910000,root.ln_1.tb_6141.fluidVelocity_DOUBLE,15.0,DOUBLE,",
+          "1675995566000,root.ln_1.tb_6141.machineExit_BOOLEAN,false,BOOLEAN,",
+          "1675995566000,root.ln_1.tb_6141.machineOn_BOOLEAN,true,BOOLEAN,",
+          "1677033625000,root.ln_1.tb_6141.status_BOOLEAN,true,BOOLEAN,",
+          "1675995566000,root.ln_1.tb_6141.switch_BOOLEAN,false,BOOLEAN,",
+          "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
+          "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
+        };
+    resultSetEqualTest("select last * from root.ln_1.tb_6141;", expectedHeader, retArray);
+  }
+
+  @Test
+  public void testLastQueryOrderByTimeDesc() {
+    String[] expectedHeader =
+        new String[] {TIMESTAMP_STR, TIMESEIRES_STR, VALUE_STR, DATA_TYPE_STR};
+    String[] retArray =
+        new String[] {
+          "1679365910000,root.ln_1.tb_6141.waterTP_DOUBLE,15.0,DOUBLE,",
+          "1679365910000,root.ln_1.tb_6141.waterInterval_DOUBLE,10.0,DOUBLE,",
+          "1675995566000,root.ln_1.tb_6141.switch_BOOLEAN,false,BOOLEAN,",
+          "1677033625000,root.ln_1.tb_6141.status_BOOLEAN,true,BOOLEAN,",
+          "1675995566000,root.ln_1.tb_6141.machineOn_BOOLEAN,true,BOOLEAN,",
+          "1675995566000,root.ln_1.tb_6141.machineExit_BOOLEAN,false,BOOLEAN,",
+          "1679365910000,root.ln_1.tb_6141.fluidVelocity_DOUBLE,15.0,DOUBLE,",
+          "1675995566000,root.ln_1.tb_6141.enum_INT32,2,INT32,",
+          "1675995566000,root.ln_1.tb_6141.content_TEXT,52,TEXT,",
+          "1679477545000,root.ln_1.tb_6141.code_DOUBLE,2.0,DOUBLE,",
+          "1679365910000,root.ln_1.tb_6141.`waterNH4-N_DOUBLE`,12.0,DOUBLE,",
+          "1679365910000,root.ln_1.tb_6141.CO2_DOUBLE,12.0,DOUBLE,",
+          "1679365910000,root.ln_1.tb_6141.11_TEXT,13,TEXT,",
+        };
+    resultSetEqualTest(
+        "select last * from root.ln_1.tb_6141 order by timeseries desc;", expectedHeader, retArray);
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
index affcf4b70c..63c7dd3745 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedPath.java
@@ -37,6 +37,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 
@@ -112,6 +113,11 @@ public class AlignedPath extends PartialPath {
     schemaList = new ArrayList<>();
   }
 
+  public void sortMeasurement(Comparator<String> comparator) {
+    measurementList.sort(comparator);
+    schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId, comparator));
+  }
+
   @Override
   public PartialPath getDevicePath() {
     return new PartialPath(Arrays.copyOf(nodes, nodes.length));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
index cf0141cbef..f6442f2ec6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
@@ -118,11 +118,20 @@ public class MetaUtils {
 
   public static List<PartialPath> groupAlignedSeriesWithOrder(
       List<PartialPath> fullPaths, OrderByParameter orderByParameter) {
-    List<PartialPath> res = groupAlignedSeries(fullPaths, new HashMap<>());
+    Map<String, AlignedPath> deviceToAlignedPathMap = new HashMap<>();
+    List<PartialPath> res = groupAlignedSeries(fullPaths, deviceToAlignedPathMap);
     res.sort(
         orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
             ? Comparator.naturalOrder()
             : Comparator.reverseOrder());
+    // sort the measurements of AlignedPath
+    Comparator<String> comparator =
+        orderByParameter.getSortItemList().get(0).getOrdering() == Ordering.ASC
+            ? Comparator.naturalOrder()
+            : Comparator.reverseOrder();
+    for (AlignedPath alignedPath : deviceToAlignedPathMap.values()) {
+      alignedPath.sortMeasurement(comparator);
+    }
     return res;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index d24c669c5d..aae1f8f37c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -48,6 +48,8 @@ public class LocalSinkChannel implements ISinkChannel {
   private boolean aborted = false;
   private boolean closed = false;
 
+  private boolean invokedOnFinished = false;
+
   private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
   public LocalSinkChannel(SharedTsBlockQueue queue, SinkListener sinkListener) {
@@ -106,7 +108,10 @@ public class LocalSinkChannel implements ISinkChannel {
     synchronized (queue) {
       if (isFinished()) {
         synchronized (this) {
-          sinkListener.onFinish(this);
+          if (!invokedOnFinished) {
+            sinkListener.onFinish(this);
+            invokedOnFinished = true;
+          }
         }
       }
     }
@@ -188,7 +193,10 @@ public class LocalSinkChannel implements ISinkChannel {
         }
         closed = true;
         queue.close();
-        sinkListener.onFinish(this);
+        if (!invokedOnFinished) {
+          sinkListener.onFinish(this);
+          invokedOnFinished = true;
+        }
       }
     }
     LOGGER.debug("[EndCloseLocalSinkChannel]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 07587ea43b..b7d9309cf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -49,6 +49,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
@@ -103,6 +104,8 @@ public class SinkChannel implements ISinkChannel {
 
   private boolean noMoreTsBlocks = false;
 
+  private final AtomicBoolean invokedOnFinished = new AtomicBoolean(false);
+
   /** max bytes this SinkChannel can reserve. */
   private long maxBytesCanReserve =
       IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
@@ -263,11 +266,17 @@ public class SinkChannel implements ISinkChannel {
         .getQueryPool()
         .clearMemoryReservationMap(
             localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
-    sinkListener.onFinish(this);
+    invokeOnFinished();
     closed = true;
     LOGGER.debug("[EndCloseSinkChannel]");
   }
 
+  private void invokeOnFinished() {
+    if (invokedOnFinished.compareAndSet(false, true)) {
+      sinkListener.onFinish(this);
+    }
+  }
+
   @Override
   public boolean isClosed() {
     return closed;
@@ -347,7 +356,7 @@ public class SinkChannel implements ISinkChannel {
       }
     }
     if (isFinished()) {
-      sinkListener.onFinish(this);
+      invokeOnFinished();
     }
   }
 
@@ -523,7 +532,7 @@ public class SinkChannel implements ISinkChannel {
         }
         noMoreTsBlocks = true;
         if (isFinished()) {
-          sinkListener.onFinish(SinkChannel.this);
+          invokeOnFinished();
         }
         sinkListener.onEndOfBlocks(SinkChannel.this);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 22efafdb10..16771c6ad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -567,6 +567,17 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
                         return fullPath;
                       }))
               .collect(Collectors.toList()));
+      lastQueryNode
+          .getChildren()
+          .forEach(
+              child -> {
+                if (child instanceof AlignedLastQueryScanNode) {
+                  // sort the measurements of AlignedPath for LastQueryMergeOperator
+                  ((AlignedLastQueryScanNode) child)
+                      .getSeriesPath()
+                      .sortMeasurement(Comparator.naturalOrder());
+                }
+              });
     } else {
       for (PlanNode child : root.getChildren()) {
         addSortForEachLastQueryNode(child, orderByParameter);