You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2021/12/09 02:10:10 UTC

[iotdb] branch master updated: [IOTDB-2124] the filtering condition does not take efffect for last query in cluster (#4539)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d2116cc  [IOTDB-2124] the filtering condition does not take efffect for last query in cluster (#4539)
d2116cc is described below

commit d2116cca2a13c68921de444e62dea8ac97489ebf
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Thu Dec 9 10:09:41 2021 +0800

    [IOTDB-2124] the filtering condition does not take efffect for last query in cluster (#4539)
    
    [IOTDB-2124] the filtering condition does not take efffect for last query in cluster
---
 .../cluster/client/sync/SyncClientAdaptor.java     |  5 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  2 +-
 .../query/last/ClusterLastQueryExecutor.java       | 29 ++++---
 .../cluster/client/sync/SyncClientAdaptorTest.java |  1 +
 .../iotdb/cluster/common/TestAsyncDataClient.java  | 11 +++
 .../query/last/ClusterLastQueryExecutorTest.java   | 95 ++++++++++++++++++++++
 6 files changed, 131 insertions(+), 12 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 016185e..36ca50e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -498,6 +498,7 @@ public class SyncClientAdaptor {
       AsyncDataClient client,
       List<PartialPath> seriesPaths,
       List<Integer> dataTypeOrdinals,
+      Filter timeFilter,
       QueryContext context,
       Map<String, Set<String>> deviceMeasurements,
       RaftNode header)
@@ -512,7 +513,9 @@ public class SyncClientAdaptor {
             deviceMeasurements,
             header,
             client.getNode());
-
+    if (timeFilter != null) {
+      request.setFilterBytes(SerializeUtils.serializeFilter(timeFilter));
+    }
     client.last(request, handler);
     return handler.getResult(ClusterConstant.getReadOperationTimeoutMS());
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index c20e74b..dd6ec9f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -1012,7 +1012,7 @@ public class LocalQueryExecutor {
         queryManager.getQueryContext(request.getRequestor(), request.getQueryId());
     List<PartialPath> partialPaths = new ArrayList<>();
     for (String path : request.getPaths()) {
-      partialPaths.add(new PartialPath(path));
+      partialPaths.add(new MeasurementPath(path));
     }
     List<TSDataType> dataTypes = new ArrayList<>(request.dataTypeOrdinals.size());
     for (Integer dataTypeOrdinal : request.dataTypeOrdinals) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index c4900d1..db6b14d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
@@ -244,12 +246,14 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
       if (asyncDataClient == null) {
         return null;
       }
-
+      Filter timeFilter =
+          (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
       buffer =
           SyncClientAdaptor.last(
               asyncDataClient,
               seriesPaths,
               dataTypeOrdinals,
+              timeFilter,
               context,
               queryPlan.getDeviceToMeasurements(),
               group.getHeader());
@@ -263,15 +267,20 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
         syncDataClient =
             ClusterIoTDB.getInstance()
                 .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
-        res =
-            syncDataClient.last(
-                new LastQueryRequest(
-                    PartialPath.toStringList(seriesPaths),
-                    dataTypeOrdinals,
-                    context.getQueryId(),
-                    queryPlan.getDeviceToMeasurements(),
-                    group.getHeader(),
-                    syncDataClient.getNode()));
+        LastQueryRequest lastQueryRequest =
+            new LastQueryRequest(
+                PartialPath.toStringList(seriesPaths),
+                dataTypeOrdinals,
+                context.getQueryId(),
+                queryPlan.getDeviceToMeasurements(),
+                group.getHeader(),
+                syncDataClient.getNode());
+        Filter timeFilter =
+            (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
+        if (timeFilter != null) {
+          lastQueryRequest.setFilterBytes(SerializeUtils.serializeFilter(timeFilter));
+        }
+        res = syncDataClient.last(lastQueryRequest);
       } catch (IOException | TException e) {
         // the connection may be broken, close it to avoid it being reused
         if (syncDataClient != null) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 1247c4c..f6b3b13 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -438,6 +438,7 @@ public class SyncClientAdaptorTest {
             dataClient,
             Collections.singletonList(new PartialPath("1")),
             Collections.singletonList(TSDataType.INT64.ordinal()),
+            null,
             new QueryContext(),
             Collections.emptyMap(),
             TestUtils.getRaftNode(0, 0)));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 1303df1..92f8ae9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
 import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
 import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
+import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -289,4 +290,14 @@ public class TestAsyncDataClient extends AsyncDataClient {
             })
         .start();
   }
+
+  @Override
+  public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
+    new Thread(
+            () ->
+                new DataAsyncService(dataGroupMemberMap.get(request.getHeader()))
+                    .last(request, resultHandler))
+        .start();
+  }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutorTest.java
new file mode 100644
index 0000000..5a71048
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.last;
+
+import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.query.BaseQueryTest;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterLastQueryExecutorTest extends BaseQueryTest {
+
+  @Test
+  public void testLastQueryTimeFilter()
+      throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
+    LastQueryPlan plan = new LastQueryPlan();
+    plan.setDeduplicatedPathsAndUpdate(
+        Collections.singletonList(
+            new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
+    plan.setPaths(plan.getDeduplicatedPaths());
+    IExpression expression = new GlobalTimeExpression(TimeFilter.gtEq(Long.MAX_VALUE));
+    plan.setExpression(expression);
+    QueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+    try {
+      ClusterLastQueryExecutor lastQueryExecutor =
+          new ClusterLastQueryExecutor(plan, testMetaMember);
+      QueryDataSet queryDataSet = lastQueryExecutor.execute(context, plan);
+      assertFalse(queryDataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
+
+  @Test
+  public void testLastQueryNoTimeFilter()
+      throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
+    LastQueryPlan plan = new LastQueryPlan();
+    plan.setDeduplicatedPathsAndUpdate(
+        Collections.singletonList(
+            new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
+    plan.setPaths(plan.getDeduplicatedPaths());
+    List<ResultColumn> resultColumnList = new ArrayList<>();
+    resultColumnList.add(new ResultColumn(null, "a"));
+    plan.setResultColumns(resultColumnList);
+    QueryContext context =
+        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+    try {
+      ClusterLastQueryExecutor lastQueryExecutor =
+          new ClusterLastQueryExecutor(plan, testMetaMember);
+      QueryDataSet queryDataSet = lastQueryExecutor.execute(context, plan);
+      assertTrue(queryDataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
+  }
+}