You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2021/12/09 02:10:10 UTC
[iotdb] branch master updated: [IOTDB-2124] the filtering condition does not take efffect for last query in cluster (#4539)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d2116cc [IOTDB-2124] the filtering condition does not take efffect for last query in cluster (#4539)
d2116cc is described below
commit d2116cca2a13c68921de444e62dea8ac97489ebf
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Thu Dec 9 10:09:41 2021 +0800
[IOTDB-2124] the filtering condition does not take efffect for last query in cluster (#4539)
[IOTDB-2124] the filtering condition does not take efffect for last query in cluster
---
.../cluster/client/sync/SyncClientAdaptor.java | 5 +-
.../iotdb/cluster/query/LocalQueryExecutor.java | 2 +-
.../query/last/ClusterLastQueryExecutor.java | 29 ++++---
.../cluster/client/sync/SyncClientAdaptorTest.java | 1 +
.../iotdb/cluster/common/TestAsyncDataClient.java | 11 +++
.../query/last/ClusterLastQueryExecutorTest.java | 95 ++++++++++++++++++++++
6 files changed, 131 insertions(+), 12 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 016185e..36ca50e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -498,6 +498,7 @@ public class SyncClientAdaptor {
AsyncDataClient client,
List<PartialPath> seriesPaths,
List<Integer> dataTypeOrdinals,
+ Filter timeFilter,
QueryContext context,
Map<String, Set<String>> deviceMeasurements,
RaftNode header)
@@ -512,7 +513,9 @@ public class SyncClientAdaptor {
deviceMeasurements,
header,
client.getNode());
-
+ if (timeFilter != null) {
+ request.setFilterBytes(SerializeUtils.serializeFilter(timeFilter));
+ }
client.last(request, handler);
return handler.getResult(ClusterConstant.getReadOperationTimeoutMS());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index c20e74b..dd6ec9f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -1012,7 +1012,7 @@ public class LocalQueryExecutor {
queryManager.getQueryContext(request.getRequestor(), request.getQueryId());
List<PartialPath> partialPaths = new ArrayList<>();
for (String path : request.getPaths()) {
- partialPaths.add(new PartialPath(path));
+ partialPaths.add(new MeasurementPath(path));
}
List<TSDataType> dataTypes = new ArrayList<>(request.dataTypeOrdinals.size());
for (Integer dataTypeOrdinal : request.dataTypeOrdinals) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index c4900d1..db6b14d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
@@ -244,12 +246,14 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
if (asyncDataClient == null) {
return null;
}
-
+ Filter timeFilter =
+ (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
buffer =
SyncClientAdaptor.last(
asyncDataClient,
seriesPaths,
dataTypeOrdinals,
+ timeFilter,
context,
queryPlan.getDeviceToMeasurements(),
group.getHeader());
@@ -263,15 +267,20 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
syncDataClient =
ClusterIoTDB.getInstance()
.getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- res =
- syncDataClient.last(
- new LastQueryRequest(
- PartialPath.toStringList(seriesPaths),
- dataTypeOrdinals,
- context.getQueryId(),
- queryPlan.getDeviceToMeasurements(),
- group.getHeader(),
- syncDataClient.getNode()));
+ LastQueryRequest lastQueryRequest =
+ new LastQueryRequest(
+ PartialPath.toStringList(seriesPaths),
+ dataTypeOrdinals,
+ context.getQueryId(),
+ queryPlan.getDeviceToMeasurements(),
+ group.getHeader(),
+ syncDataClient.getNode());
+ Filter timeFilter =
+ (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
+ if (timeFilter != null) {
+ lastQueryRequest.setFilterBytes(SerializeUtils.serializeFilter(timeFilter));
+ }
+ res = syncDataClient.last(lastQueryRequest);
} catch (IOException | TException e) {
// the connection may be broken, close it to avoid it being reused
if (syncDataClient != null) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 1247c4c..f6b3b13 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -438,6 +438,7 @@ public class SyncClientAdaptorTest {
dataClient,
Collections.singletonList(new PartialPath("1")),
Collections.singletonList(TSDataType.INT64.ordinal()),
+ null,
new QueryContext(),
Collections.emptyMap(),
TestUtils.getRaftNode(0, 0)));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 1303df1..92f8ae9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
+import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
@@ -289,4 +290,14 @@ public class TestAsyncDataClient extends AsyncDataClient {
})
.start();
}
+
+ @Override
+ public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
+ new Thread(
+ () ->
+ new DataAsyncService(dataGroupMemberMap.get(request.getHeader()))
+ .last(request, resultHandler))
+ .start();
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutorTest.java
new file mode 100644
index 0000000..5a71048
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.last;
+
+import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.query.BaseQueryTest;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterLastQueryExecutorTest extends BaseQueryTest {
+
+ @Test
+ public void testLastQueryTimeFilter()
+ throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
+ LastQueryPlan plan = new LastQueryPlan();
+ plan.setDeduplicatedPathsAndUpdate(
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
+ plan.setPaths(plan.getDeduplicatedPaths());
+ IExpression expression = new GlobalTimeExpression(TimeFilter.gtEq(Long.MAX_VALUE));
+ plan.setExpression(expression);
+ QueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+ try {
+ ClusterLastQueryExecutor lastQueryExecutor =
+ new ClusterLastQueryExecutor(plan, testMetaMember);
+ QueryDataSet queryDataSet = lastQueryExecutor.execute(context, plan);
+ assertFalse(queryDataSet.hasNext());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+
+ @Test
+ public void testLastQueryNoTimeFilter()
+ throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
+ LastQueryPlan plan = new LastQueryPlan();
+ plan.setDeduplicatedPathsAndUpdate(
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
+ plan.setPaths(plan.getDeduplicatedPaths());
+ List<ResultColumn> resultColumnList = new ArrayList<>();
+ resultColumnList.add(new ResultColumn(null, "a"));
+ plan.setResultColumns(resultColumnList);
+ QueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
+ try {
+ ClusterLastQueryExecutor lastQueryExecutor =
+ new ClusterLastQueryExecutor(plan, testMetaMember);
+ QueryDataSet queryDataSet = lastQueryExecutor.execute(context, plan);
+ assertTrue(queryDataSet.hasNext());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+}