You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/03/25 07:10:34 UTC
[iotdb] branch master updated: IOTDB-1241 support redirect query
for cluster (#2867)
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9b22439 IOTDB-1241 support redirect query for cluster (#2867)
9b22439 is described below
commit 9b22439d047ed0915004139e4be7cdd12854d507
Author: chaow <cc...@163.com>
AuthorDate: Thu Mar 25 15:10:11 2021 +0800
IOTDB-1241 support redirect query for cluster (#2867)
* IOTDB-1241 support redirect query for cluster
* implement redirect with only partitionTable
---
.../resources/conf/iotdb-cluster.properties | 2 +
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 ++
.../iotdb/cluster/config/ClusterDescriptor.java | 5 +
.../apache/iotdb/cluster/metadata/CMManager.java | 5 +-
.../cluster/query/ClusterDataQueryExecutor.java | 133 ++++++++++++++++++-
.../query/aggregate/ClusterAggregateExecutor.java | 2 +-
.../groupby/ClusterGroupByVFilterDataSet.java | 2 +-
.../cluster/query/reader/ClusterReaderFactory.java | 26 ++--
.../cluster/query/reader/ClusterTimeGenerator.java | 145 +++++++++++++++++++--
.../query/ClusterDataQueryExecutorTest.java | 83 ++++++++++++
.../query/reader/ClusterTimeGeneratorTest.java | 2 +-
.../main/java/org/apache/iotdb/SessionExample.java | 102 ++++++++++++++-
.../iotdb/db/qp/physical/crud/QueryPlan.java | 10 ++
.../db/query/dataset/AlignByDeviceDataSet.java | 11 ++
.../dataset/RawQueryDataSetWithoutValueFilter.java | 11 ++
.../db/query/executor/RawDataQueryExecutor.java | 25 ++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 78 ++++++++++-
.../org/apache/iotdb/rpc/RedirectException.java | 3 +-
.../java/org/apache/iotdb/session/Session.java | 103 +++++++++++++--
.../apache/iotdb/session/SessionConnection.java | 32 ++++-
thrift/src/main/thrift/rpc.thrift | 3 +
.../tsfile/read/query/dataset/QueryDataSet.java | 48 +++++++
22 files changed, 792 insertions(+), 49 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 3c9fabe..4126539 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -171,3 +171,5 @@ max_client_pernode_permember_number=1000
# we need to wait so much time for other connections to be released until timeout,
# or a new connection will be created.
wait_client_timeout_ms=5000
+
+enable_query_redirect=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 11cedc8..3d1cd32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -164,6 +164,8 @@ public class ClusterConfig {
private boolean openServerRpcPort = false;
+ private boolean enableQueryRedirect = false;
+
public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
@@ -467,4 +469,12 @@ public class ClusterConfig {
public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
this.waitClientTimeoutMS = waitClientTimeoutMS;
}
+
+ public boolean isEnableQueryRedirect() {
+ return enableQueryRedirect;
+ }
+
+ public void setEnableQueryRedirect(boolean enableQueryRedirect) {
+ this.enableQueryRedirect = enableQueryRedirect;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 25f85e0..a2488f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -299,6 +299,11 @@ public class ClusterDescriptor {
properties.getProperty(
"wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS()))));
+ config.setEnableQueryRedirect(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_query_redirect", String.valueOf(config.isEnableQueryRedirect()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 88451b7..da2d6ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1236,11 +1236,12 @@ public class CMManager extends MManager {
try {
Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
logger.debug(
- "{}: get matched paths of {} from {}, result {}",
+ "{}: get matched paths of {} from {}, result {} for {}",
metaGroupMember.getName(),
partitionGroup,
node,
- paths);
+ paths,
+ pathsToQuery);
if (paths != null) {
// query next group
Set<PartialPath> partialPaths = new HashSet<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index fbf8963..da371d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -21,14 +21,17 @@ package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.EmptyIntervalException;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
import org.apache.iotdb.cluster.query.reader.ClusterTimeGenerator;
+import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.executor.RawDataQueryExecutor;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -36,7 +39,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +56,8 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
private static final Logger logger = LoggerFactory.getLogger(ClusterDataQueryExecutor.class);
private MetaGroupMember metaGroupMember;
private ClusterReaderFactory readerFactory;
+ private QueryDataSet.EndPoint endPoint = null;
+ private boolean hasLocalReader = false;
ClusterDataQueryExecutor(RawDataQueryPlan plan, MetaGroupMember metaGroupMember) {
super(plan);
@@ -74,6 +81,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
}
List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+ hasLocalReader = false;
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
@@ -99,6 +107,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
if (logger.isDebugEnabled()) {
logger.debug("Initialized {} readers for {}", readersOfSelectedSeries.size(), queryPlan);
}
+
return readersOfSelectedSeries;
}
@@ -114,6 +123,128 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
protected TimeGenerator getTimeGenerator(
IExpression queryExpression, QueryContext context, RawDataQueryPlan rawDataQueryPlan)
throws StorageEngineException {
- return new ClusterTimeGenerator(queryExpression, context, metaGroupMember, rawDataQueryPlan);
+ return new ClusterTimeGenerator(
+ queryExpression, context, metaGroupMember, rawDataQueryPlan, false);
+ }
+
+ @Override
+ protected QueryDataSet needRedirect(QueryContext context, boolean hasValueFilter)
+ throws StorageEngineException {
+ if (queryPlan.isEnableRedirect()) {
+ if (hasValueFilter) {
+ // 1. check time Generator has local data
+ ClusterTimeGenerator clusterTimeGenerator =
+ new ClusterTimeGenerator(
+ queryPlan.getExpression(), context, metaGroupMember, queryPlan, true);
+ if (clusterTimeGenerator.isHasLocalReader()) {
+ this.hasLocalReader = true;
+ this.endPoint = null;
+ }
+
+ // 2. check data reader has local data
+ checkReaderHasLocalData(context, true);
+ } else {
+ // check data reader has local data
+ checkReaderHasLocalData(context, false);
+ }
+
+ logger.debug(
+ "redirect queryId {}, {}, {}, {}",
+ context.getQueryId(),
+ hasLocalReader,
+ hasValueFilter,
+ endPoint);
+
+ if (!hasLocalReader) {
+ // dummy dataSet
+ QueryDataSet dataSet = new RawQueryDataSetWithoutValueFilter(context.getQueryId());
+ dataSet.setEndPoint(endPoint);
+ return dataSet;
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings({"squid:S3776", "squid:S1141"})
+ private void checkReaderHasLocalData(QueryContext context, boolean hasValueFilter)
+ throws StorageEngineException {
+ Filter timeFilter = null;
+ if (!hasValueFilter && queryPlan.getExpression() != null) {
+ timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter();
+ }
+
+ // make sure the partition table is new
+ try {
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
+ } catch (CheckConsistencyException e) {
+ throw new StorageEngineException(e);
+ }
+
+ for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
+ PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
+ TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
+
+ try {
+ List<PartitionGroup> partitionGroups = null;
+ if (hasValueFilter) {
+ partitionGroups = metaGroupMember.routeFilter(null, path);
+ } else {
+ partitionGroups = metaGroupMember.routeFilter(timeFilter, path);
+ }
+
+ for (PartitionGroup partitionGroup : partitionGroups) {
+ if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+ DataGroupMember dataGroupMember =
+ metaGroupMember.getLocalDataMember(
+ partitionGroup.getHeader(),
+ String.format(
+ "Query: %s, time filter: %s, queryId: %d",
+ path, null, context.getQueryId()));
+
+ if (hasValueFilter) {
+ IReaderByTimestamp readerByTimestamp =
+ readerFactory.getReaderByTimestamp(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ context,
+ dataGroupMember,
+ queryPlan.isAscending());
+
+ if (readerByTimestamp != null) {
+ this.hasLocalReader = true;
+ this.endPoint = null;
+ }
+ } else {
+ IPointReader pointReader =
+ readerFactory.getSeriesPointReader(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ timeFilter,
+ null,
+ context,
+ dataGroupMember,
+ queryPlan.isAscending());
+
+ if (pointReader.hasNextTimeValuePair()) {
+ this.hasLocalReader = true;
+ this.endPoint = null;
+ pointReader.close();
+ break;
+ }
+ pointReader.close();
+ }
+ } else if (endPoint == null) {
+ endPoint =
+ new QueryDataSet.EndPoint(
+ partitionGroup.getHeader().getClientIp(),
+ partitionGroup.getHeader().getClientPort());
+ }
+ }
+ } catch (Exception e) {
+ throw new StorageEngineException(e);
+ }
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
index 97f2b58..ba6eba3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregateExecutor.java
@@ -85,7 +85,7 @@ public class ClusterAggregateExecutor extends AggregationExecutor {
@Override
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan rawDataQueryPlan)
throws StorageEngineException {
- return new ClusterTimeGenerator(expression, context, metaMember, rawDataQueryPlan);
+ return new ClusterTimeGenerator(expression, context, metaMember, rawDataQueryPlan, false);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
index 71faf40..a954234 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSet.java
@@ -63,7 +63,7 @@ public class ClusterGroupByVFilterDataSet extends GroupByWithValueFilterDataSet
protected TimeGenerator getTimeGenerator(
IExpression expression, QueryContext context, RawDataQueryPlan rawDataQueryPlan)
throws StorageEngineException {
- return new ClusterTimeGenerator(expression, context, metaGroupMember, rawDataQueryPlan);
+ return new ClusterTimeGenerator(expression, context, metaGroupMember, rawDataQueryPlan, false);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 91819b1..77ffc01 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -121,11 +121,11 @@ public class ClusterReaderFactory {
List<IReaderByTimestamp> readers = new ArrayList<>(partitionGroups.size());
for (PartitionGroup partitionGroup : partitionGroups) {
// query each group to get a reader in that group
- readers.add(
+ IReaderByTimestamp readerByTimestamp =
getSeriesReaderByTime(
- partitionGroup, path, deviceMeasurements, context, dataType, ascending));
+ partitionGroup, path, deviceMeasurements, context, dataType, ascending);
+ readers.add(readerByTimestamp);
}
- // merge the readers
return new MergedReaderByTime(readers);
}
@@ -311,13 +311,13 @@ public class ClusterReaderFactory {
* Create an IPointReader of "path" with “timeFilter” and "valueFilter". A synchronization with
* the leader will be performed according to consistency level
*
- * @param path
- * @param dataType
+ * @param path series path
+ * @param dataType data type
* @param timeFilter nullable
* @param valueFilter nullable
- * @param context
- * @return
- * @throws StorageEngineException
+ * @param context query context
+ * @return reader
+ * @throws StorageEngineException encounter exception
*/
public IPointReader getSeriesPointReader(
PartialPath path,
@@ -351,13 +351,13 @@ public class ClusterReaderFactory {
* Create a SeriesReader of "path" with “timeFilter” and "valueFilter". The consistency is not
* guaranteed here and only data slots managed by the member will be queried.
*
- * @param path
- * @param dataType
+ * @param path series path
+ * @param dataType data type
* @param timeFilter nullable
* @param valueFilter nullable
- * @param context
- * @return
- * @throws StorageEngineException
+ * @param context query context
+ * @return reader for series
+ * @throws StorageEngineException encounter exception
*/
private SeriesReader getSeriesReader(
PartialPath path,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
index b7b5f51..371c81b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
@@ -21,39 +21,59 @@ package org.apache.iotdb.cluster.query.reader;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
+import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
+import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
public class ClusterTimeGenerator extends ServerTimeGenerator {
private ClusterReaderFactory readerFactory;
+ private boolean hasLocalReader = false;
+ private QueryDataSet.EndPoint endPoint = null;
/** Constructor of EngineTimeGenerator. */
public ClusterTimeGenerator(
IExpression expression,
QueryContext context,
MetaGroupMember metaGroupMember,
- RawDataQueryPlan rawDataQueryPlan)
+ RawDataQueryPlan rawDataQueryPlan,
+ boolean onlyCheckLocalData)
throws StorageEngineException {
super(context);
this.queryPlan = rawDataQueryPlan;
this.readerFactory = new ClusterReaderFactory(metaGroupMember);
try {
readerFactory.syncMetaGroup();
- constructNode(expression);
+ if (onlyCheckLocalData) {
+ whetherHasLocalDataGroup(expression, metaGroupMember, queryPlan.isAscending());
+ } else {
+ constructNode(expression);
+ }
} catch (IOException | CheckConsistencyException e) {
throw new StorageEngineException(e);
}
@@ -65,20 +85,125 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
Filter filter = expression.getFilter();
PartialPath path = (PartialPath) expression.getSeriesPath();
TSDataType dataType;
+ ManagedSeriesReader mergeReader = null;
try {
dataType =
((CMManager) IoTDB.metaManager)
.getSeriesTypesByPaths(Collections.singletonList(path), null)
.left
.get(0);
- return readerFactory.getSeriesReader(
- path,
- queryPlan.getAllMeasurementsInDevice(path.getDevice()),
- dataType,
- null,
- filter,
- context,
- queryPlan.isAscending());
+ mergeReader =
+ readerFactory.getSeriesReader(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ null,
+ filter,
+ context,
+ queryPlan.isAscending());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return mergeReader;
+ }
+
+ public boolean isHasLocalReader() {
+ return hasLocalReader;
+ }
+
+ public void setHasLocalReader(boolean hasLocalReader) {
+ this.hasLocalReader = hasLocalReader;
+ }
+
+ public QueryDataSet.EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", has local reader:" + hasLocalReader;
+ }
+
+ public void whetherHasLocalDataGroup(
+ IExpression expression, MetaGroupMember metaGroupMember, boolean isAscending)
+ throws IOException {
+ this.hasLocalReader = false;
+ constructNode(expression, metaGroupMember, isAscending);
+ }
+
+ private Node constructNode(
+ IExpression expression, MetaGroupMember metaGroupMember, boolean isAscending)
+ throws IOException {
+ if (expression.getType() == ExpressionType.SERIES) {
+ SingleSeriesExpression singleSeriesExp = (SingleSeriesExpression) expression;
+ checkHasLocalReader(singleSeriesExp, metaGroupMember);
+ return new LeafNode(null);
+ } else {
+ Node leftChild =
+ constructNode(((IBinaryExpression) expression).getLeft(), metaGroupMember, isAscending);
+ Node rightChild =
+ constructNode(((IBinaryExpression) expression).getRight(), metaGroupMember, isAscending);
+
+ if (expression.getType() == ExpressionType.OR) {
+ return new OrNode(leftChild, rightChild, isAscending);
+ } else if (expression.getType() == ExpressionType.AND) {
+ return new AndNode(leftChild, rightChild, isAscending);
+ }
+ throw new UnSupportedDataTypeException(
+ "Unsupported ExpressionType when construct OperatorNode: " + expression.getType());
+ }
+ }
+
+ private void checkHasLocalReader(
+ SingleSeriesExpression expression, MetaGroupMember metaGroupMember) throws IOException {
+ Filter filter = expression.getFilter();
+ PartialPath path = (PartialPath) expression.getSeriesPath();
+ TSDataType dataType;
+ try {
+ dataType =
+ ((CMManager) IoTDB.metaManager)
+ .getSeriesTypesByPaths(Collections.singletonList(path), null)
+ .left
+ .get(0);
+
+ List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(null, path);
+ for (PartitionGroup partitionGroup : partitionGroups) {
+ if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+ DataGroupMember dataGroupMember =
+ metaGroupMember.getLocalDataMember(
+ partitionGroup.getHeader(),
+ String.format(
+ "Query: %s, time filter: %s, queryId: %d", path, null, context.getQueryId()));
+
+ IPointReader pointReader =
+ readerFactory.getSeriesPointReader(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ null,
+ filter,
+ context,
+ dataGroupMember,
+ queryPlan.isAscending());
+
+ if (pointReader.hasNextTimeValuePair()) {
+ this.hasLocalReader = true;
+ this.endPoint = null;
+ pointReader.close();
+ break;
+ }
+ pointReader.close();
+ } else if (endPoint == null) {
+ endPoint =
+ new QueryDataSet.EndPoint(
+ partitionGroup.getHeader().getClientIp(),
+ partitionGroup.getHeader().getClientPort());
+ }
+ }
} catch (Exception e) {
throw new IOException(e);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index aa48624..bf88b89 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -27,18 +28,40 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
public class ClusterDataQueryExecutorTest extends BaseQueryTest {
private ClusterDataQueryExecutor queryExecutor;
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false);
+ }
+
@Test
public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException {
RawDataQueryPlan plan = new RawDataQueryPlan();
@@ -75,4 +98,64 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
+
+ @Test
+ public void testNoFilterWithRedirect() throws StorageEngineException, QueryProcessException {
+ RawDataQueryPlan plan = new RawDataQueryPlan();
+ plan.setDeduplicatedPaths(pathList);
+ plan.setDeduplicatedDataTypes(dataTypes);
+ plan.setEnableRedirect(true);
+ queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+ assertNull(dataSet.getEndPoint());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+
+ @Test
+ public void testFilterWithValueFilterRedirect()
+ throws StorageEngineException, QueryProcessException, IllegalPathException {
+ IExpression expression =
+ new SingleSeriesExpression(
+ new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0));
+ RawDataQueryPlan plan = new RawDataQueryPlan();
+ plan.setDeduplicatedPaths(pathList);
+ plan.setDeduplicatedDataTypes(dataTypes);
+ plan.setExpression(expression);
+ plan.setEnableRedirect(true);
+ queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithValueFilter(context);
+ assertNull(dataSet.getEndPoint());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+
+ @Test
+ public void testFilterWithTimeFilterRedirect()
+ throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
+ IExpression expression =
+ new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10)));
+ RawDataQueryPlan plan = new RawDataQueryPlan();
+ plan.setDeduplicatedPaths(pathList.subList(0, 1));
+ plan.setDeduplicatedDataTypes(dataTypes.subList(0, 1));
+ plan.setExpression(expression);
+ plan.setEnableRedirect(true);
+ queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+ assertEquals("ip:port=0.0.0.0:6667", dataSet.getEndPoint().toString());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
index 417b849..402400d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
@@ -62,7 +62,7 @@ public class ClusterTimeGeneratorTest extends BaseQueryTest {
dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(1, 1)));
ClusterTimeGenerator timeGenerator =
- new ClusterTimeGenerator(expression, context, testMetaMember, dataQueryPlan);
+ new ClusterTimeGenerator(expression, context, testMetaMember, dataQueryPlan, false);
for (int i = 3; i <= 8; i++) {
assertTrue(timeGenerator.hasNext());
assertEquals(i, timeGenerator.next());
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 20387de..e8da3f2 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -36,19 +36,22 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+@SuppressWarnings("squid:S106")
public class SessionExample {
private static Session session;
+ private static Session sessionEnableRedirect;
private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
private static final String ROOT_SG1_D1_S4 = "root.sg1.d1.s4";
private static final String ROOT_SG1_D1_S5 = "root.sg1.d1.s5";
private static final String ROOT_SG1_D1 = "root.sg1.d1";
+ private static final String LOCAL_HOST = "127.0.0.1";
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException {
- session = new Session("127.0.0.1", 6667, "root", "root");
+ session = new Session(LOCAL_HOST, 6667, "root", "root");
session.open(false);
// set session fetchSize
@@ -77,6 +80,17 @@ public class SessionExample {
deleteTimeseries();
setTimeout();
session.close();
+
+ sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+ sessionEnableRedirect.setEnableQueryRedirection(true);
+ sessionEnableRedirect.open(false);
+
+ // set session fetchSize
+ sessionEnableRedirect.setFetchSize(10000);
+
+ insertRecord4Redirect();
+ query4Redirect();
+ sessionEnableRedirect.close();
}
private static void createTimeseries()
@@ -193,6 +207,31 @@ public class SessionExample {
}
}
+ private static void insertRecord4Redirect()
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < 6; i++) {
+ for (int j = 0; j < 2; j++) {
+ String deviceId = "root.redirect" + i + ".d" + j;
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<TSDataType> types = new ArrayList<>();
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 0; time < 5; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(1L + time);
+ values.add(2L + time);
+ values.add(3L + time);
+ session.insertRecord(deviceId, time, measurements, types, values);
+ }
+ }
+ }
+ }
+
private static void insertStrRecord()
throws IoTDBConnectionException, StatementExecutionException {
String deviceId = ROOT_SG1_D1;
@@ -448,6 +487,65 @@ public class SessionExample {
dataSet.closeOperationHandle();
}
+ private static void query4Redirect()
+ throws IoTDBConnectionException, StatementExecutionException {
+ String selectPrefix = "select * from root.redirect";
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(selectPrefix + i + ".d1");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(
+ selectPrefix + i + ".d1 where time >= 1 and time < 10");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(
+ selectPrefix + i + ".d1 where time >= 1 and time < 10 align by device");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(
+ selectPrefix
+ + i
+ + ".d1 where time >= 1 and time < 10 and root.redirect"
+ + i
+ + "d1.s1 > 1");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+ }
+
private static void queryWithTimeout()
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000);
@@ -526,7 +624,7 @@ public class SessionExample {
}
private static void setTimeout() throws StatementExecutionException {
- Session tempSession = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
+ Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000);
tempSession.setTimeout(60000);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index bcd5b05..a188db5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -42,6 +42,8 @@ public abstract class QueryPlan extends PhysicalPlan {
private Map<String, Integer> pathToIndex = new HashMap<>();
+ private boolean enableRedirect = false;
+
public QueryPlan() {
super(true);
setOperatorType(Operator.OperatorType.QUERY);
@@ -126,4 +128,12 @@ public abstract class QueryPlan extends PhysicalPlan {
throws IllegalPathException {
return columnForReader;
}
+
+ public boolean isEnableRedirect() {
+ return enableRedirect;
+ }
+
+ public void setEnableRedirect(boolean enableRedirect) {
+ this.enableRedirect = enableRedirect;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index f8ce3ea..b37a16e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
@@ -108,6 +109,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
this.dataSetType = DataSetType.QUERY;
this.rawDataQueryPlan = new RawDataQueryPlan();
this.rawDataQueryPlan.setAscending(alignByDevicePlan.isAscending());
+ // only redirect query for raw data query
+ this.rawDataQueryPlan.setEnableRedirect(alignByDevicePlan.isEnableRedirect());
}
this.curDataSetInitialized = false;
@@ -198,6 +201,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
throw new IOException(e);
}
+ if (currentDataSet.getEndPoint() != null) {
+ org.apache.iotdb.service.rpc.thrift.EndPoint endPoint =
+ new org.apache.iotdb.service.rpc.thrift.EndPoint();
+ endPoint.setIp(currentDataSet.getEndPoint().getIp());
+ endPoint.setPort(currentDataSet.getEndPoint().getPort());
+ throw new RedirectException(endPoint);
+ }
+
if (currentDataSet.hasNext()) {
curDataSetInitialized = true;
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 63f08c1..c46fe1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -180,6 +180,17 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
init();
}
+ /**
+ * Dummy dataSet for redirect query.
+ *
+ * @param queryId queryId for the query.
+ */
+ public RawQueryDataSetWithoutValueFilter(long queryId) {
+ this.queryId = queryId;
+ blockingQueueArray = new BlockingQueue[0];
+ timeHeap = new TimeSelector(0, ascending);
+ }
+
private void init() throws IOException, InterruptedException {
timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending);
for (int i = 0; i < seriesReaderList.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 042934a..fc75d0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -61,6 +61,10 @@ public class RawDataQueryExecutor {
/** without filter or with global time filter. */
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException, QueryProcessException {
+ QueryDataSet dataSet = needRedirect(context, false);
+ if (dataSet != null) {
+ return dataSet;
+ }
List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
try {
return new RawQueryDataSetWithoutValueFilter(
@@ -79,6 +83,10 @@ public class RawDataQueryExecutor {
public final QueryDataSet executeNonAlign(QueryContext context)
throws StorageEngineException, QueryProcessException {
+ QueryDataSet dataSet = needRedirect(context, false);
+ if (dataSet != null) {
+ return dataSet;
+ }
List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
return new NonAlignEngineDataSet(
context.getQueryId(),
@@ -133,6 +141,11 @@ public class RawDataQueryExecutor {
*/
public final QueryDataSet executeWithValueFilter(QueryContext context)
throws StorageEngineException, QueryProcessException {
+ QueryDataSet dataSet = needRedirect(context, true);
+ if (dataSet != null) {
+ return dataSet;
+ }
+
TimeGenerator timestampGenerator =
getTimeGenerator(queryPlan.getExpression(), context, queryPlan);
List<Boolean> cached =
@@ -196,4 +209,16 @@ public class RawDataQueryExecutor {
throws StorageEngineException {
return new ServerTimeGenerator(expression, context, queryPlan);
}
+
+ /**
+ * Check whether need to redirect query to other node.
+ *
+ * @param context query context
+ * @param hasValueFilter if has value filter, we need to check timegenerator
+ * @return dummyDataSet to avoid more cost, if null, no need
+ */
+ protected QueryDataSet needRedirect(QueryContext context, boolean hasValueFilter)
+ throws StorageEngineException, QueryProcessException {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6aac0c7..eb7571f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -83,8 +83,10 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -172,6 +174,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private static final String INFO_NOT_ALLOWED_IN_BATCH_ERROR =
"The query statement is not allowed in batch: ";
+ private static final String INFO_INTERRUPT_ERROR =
+ "Current Thread interrupted when dealing with request {}";
+
private static final int MAX_SIZE =
IoTDBDescriptor.getInstance().getConfig().getQueryCacheSizeInMetric();
private static final int DELETE_SIZE = 20;
@@ -531,8 +536,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()))
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
: executeUpdateStatement(physicalPlan, req.getSessionId());
+ } catch (InterruptedException e) {
+ LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+ Thread.currentThread().interrupt();
+ return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
}
@@ -557,9 +567,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()))
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ } catch (InterruptedException e) {
+ LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+ Thread.currentThread().interrupt();
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "executing executeQueryStatement"));
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "executing executeQueryStatement"));
@@ -582,9 +598,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
physicalPlan,
req.fetchSize,
config.getQueryTimeoutThreshold(),
- sessionIdUsernameMap.get(req.getSessionId()))
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
+ } catch (InterruptedException e) {
+ LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+ Thread.currentThread().interrupt();
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "executing executeRawDataQuery"));
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "executing executeRawDataQuery"));
@@ -595,14 +617,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, UDFPlan,
* some AuthorPlan
*/
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ @SuppressWarnings({"squid:S3776", "squid:S1141"}) // Suppress high Cognitive Complexity warning
private TSExecuteStatementResp internalExecuteQueryStatement(
String statement,
long statementId,
PhysicalPlan plan,
int fetchSize,
long timeout,
- String username)
+ String username,
+ boolean enableRedirect)
throws QueryProcessException, SQLException, StorageEngineException,
QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
TException, AuthException {
@@ -644,8 +667,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
resp = getQueryColumnHeaders(plan, username);
}
+ if (plan instanceof QueryPlan) {
+ ((QueryPlan) plan).setEnableRedirect(enableRedirect);
+ }
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan, fetchSize);
+
+ if (newDataSet.getEndPoint() != null && enableRedirect) {
+ // redirect query
+ LOGGER.debug(
+ "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(
+ new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(queryId);
+ return resp;
+ }
+
if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
resp = getListDataSetHeaders(newDataSet);
} else if (plan instanceof UDFPlan) {
@@ -662,7 +702,27 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (newDataSet instanceof DirectNonAlignDataSet) {
resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
} else {
- resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
+ try {
+ TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+ resp.setQueryDataSet(tsQueryDataSet);
+ } catch (RedirectException e) {
+ LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+ if (enableRedirect) {
+ // redirect query
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(e.getEndPoint());
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(queryId);
+ return resp;
+ } else {
+ LOGGER.error(
+ "execute {} error, if session does not support redirect,"
+ + " should not throw redirection exception.",
+ statement,
+ e);
+ }
+ }
}
resp.setQueryId(queryId);
@@ -976,6 +1036,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
queryTimeManager.unRegisterQuery(req.queryId);
return resp;
}
+ } catch (InterruptedException e) {
+ LOGGER.error(INFO_INTERRUPT_ERROR, req, e);
+ Thread.currentThread().interrupt();
+ return RpcUtils.getTSFetchResultsResp(
+ onNPEOrUnexpectedException(
+ e, "executing fetchResults", TSStatusCode.INTERNAL_SERVER_ERROR));
} catch (Exception e) {
releaseQueryResourceNoExceptions(req.queryId);
return RpcUtils.getTSFetchResultsResp(
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index e84a96a..15f3157 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.rpc;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import java.io.IOException;
import java.util.Map;
-public class RedirectException extends Exception {
+public class RedirectException extends IOException {
private final EndPoint endPoint;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index b873ade..1a2fa6f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -66,6 +66,8 @@ public class Session {
protected static final TSProtocolVersion protocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:";
+ public static final String MSG_DONOT_ENABLE_REDIRECT =
+ "Query do not enable redirect," + " please confirm the session and server conf.";
protected String username;
protected String password;
protected int fetchSize;
@@ -94,6 +96,8 @@ public class Session {
protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>();
+ protected boolean enableQueryRedirection = false;
+
public Session(String host, int rpcPort) {
this(
host,
@@ -254,9 +258,10 @@ public class Session {
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
metaSessionConnection = defaultSessionConnection;
isClosed = false;
- if (enableCacheLeader) {
+ if (enableCacheLeader || enableQueryRedirection) {
deviceIdToEndpoint = new HashMap<>();
endPointToSessionConnection = new HashMap<>();
endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
@@ -452,7 +457,7 @@ public class Session {
*/
public SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
- return defaultSessionConnection.executeQueryStatement(sql, timeout);
+ return executeStatementMayRedirect(sql, timeout);
}
/**
@@ -467,7 +472,42 @@ public class Session {
if (timeoutInMs < 0) {
throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
}
- return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+ return executeStatementMayRedirect(sql, timeoutInMs);
+ }
+
+ /**
+ * execute the query, may redirect query to other node.
+ *
+ * @param sql the query statement
+ * @param timeoutInMs time in ms
+ * @return data set
+ * @throws StatementExecutionException statement is not right
+ * @throws IoTDBConnectionException the network is not good
+ */
+ private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
+ return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ logger.debug(
+ "{} redirect query {} to {}",
+ defaultSessionConnection.getEndPoint(),
+ sql,
+ e.getEndPoint());
+ // retry
+ try {
+ return defaultSessionConnection.executeQueryStatement(sql, timeout);
+ } catch (RedirectException redirectException) {
+ logger.error("{} redirect twice", sql, redirectException);
+ throw new StatementExecutionException(sql + " redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
}
/**
@@ -484,16 +524,32 @@ public class Session {
* query eg. select * from paths where time >= startTime and time < endTime time interval include
* startTime and exclude endTime
*
- * @param paths
+ * @param paths series path
* @param startTime included
* @param endTime excluded
- * @return
- * @throws StatementExecutionException
- * @throws IoTDBConnectionException
+ * @return data set
+ * @throws StatementExecutionException statement is not right
+ * @throws IoTDBConnectionException the network is not good
*/
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
- return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ try {
+ return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+ // retry
+ try {
+ return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ } catch (RedirectException redirectException) {
+ logger.error("Redirect twice", redirectException);
+ throw new StatementExecutionException("Redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+ }
+ }
}
/**
@@ -585,6 +641,29 @@ public class Session {
}
}
+ private void handleQueryRedirection(EndPoint endPoint) throws IoTDBConnectionException {
+ if (enableQueryRedirection) {
+ SessionConnection connection =
+ endPointToSessionConnection.computeIfAbsent(
+ endPoint,
+ k -> {
+ try {
+ SessionConnection sessionConnection =
+ constructSessionConnection(this, endPoint, zoneId);
+ sessionConnection.setEnableRedirect(enableQueryRedirection);
+ return sessionConnection;
+ } catch (IoTDBConnectionException ex) {
+ tmp.set(ex);
+ return null;
+ }
+ });
+ if (connection == null) {
+ throw new IoTDBConnectionException(tmp.get());
+ }
+ defaultSessionConnection = connection;
+ }
+ }
+
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
@@ -1454,4 +1533,12 @@ public class Session {
throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
}
}
+
+ public boolean isEnableQueryRedirection() {
+ return enableQueryRedirection;
+ }
+
+ public void setEnableQueryRedirection(boolean enableQueryRedirection) {
+ this.enableQueryRedirection = enableQueryRedirection;
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 7a43483..f2b6665 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -69,6 +69,7 @@ public class SessionConnection {
private long statementId;
private ZoneId zoneId;
private EndPoint endPoint;
+ private boolean enableRedirect = false;
// TestOnly
public SessionConnection() {}
@@ -254,7 +255,11 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = null;
try {
- dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+ try {
+ dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+ } catch (RedirectException e) {
+ throw new StatementExecutionException("need to redirect query, should not see this.", e);
+ }
return dataSet.hasNext();
} finally {
if (dataSet != null) {
@@ -264,13 +269,15 @@ public class SessionConnection {
}
protected SessionDataSet executeQueryStatement(String sql, long timeout)
- throws StatementExecutionException, IoTDBConnectionException {
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeout);
TSExecuteStatementResp execResp;
try {
+ execReq.setEnableRedirectQuery(enableRedirect);
execResp = client.executeQueryStatement(execReq);
+ RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
@@ -304,6 +311,7 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
try {
+ execReq.setEnableRedirectQuery(enableRedirect);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
RpcUtils.verifySuccess(execResp.getStatus());
} catch (TException e) {
@@ -322,13 +330,15 @@ public class SessionConnection {
}
protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
- throws StatementExecutionException, IoTDBConnectionException {
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSRawDataQueryReq execReq =
new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId);
execReq.setFetchSize(session.fetchSize);
TSExecuteStatementResp execResp;
try {
+ execReq.setEnableRedirectQuery(enableRedirect);
execResp = client.executeRawDataQuery(execReq);
+ RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
@@ -660,4 +670,20 @@ public class SessionConnection {
}
return flag;
}
+
+ public boolean isEnableRedirect() {
+ return enableRedirect;
+ }
+
+ public void setEnableRedirect(boolean enableRedirect) {
+ this.enableRedirect = enableRedirect;
+ }
+
+ public EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
}
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 83cb70c..94892fa 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -117,6 +117,8 @@ struct TSExecuteStatementReq {
4: optional i32 fetchSize
5: optional i64 timeout
+
+ 6: optional bool enableRedirectQuery;
}
struct TSExecuteBatchStatementReq{
@@ -276,6 +278,7 @@ struct TSRawDataQueryReq {
4: required i64 startTime
5: required i64 endTime
6: required i64 statementId
+ 7: optional bool enableRedirectQuery;
}
struct TSCreateMultiTimeseriesReq {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index c68a0e0..453403e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -35,6 +35,46 @@ public abstract class QueryDataSet {
protected int alreadyReturnedRowNum = 0;
protected int fetchSize = 10000;
protected boolean ascending;
+ /*
+ * whether current data group has data for query.
+ * If not null(must be in cluster mode),
+ * we need to redirect the query to any data group which has some data to speed up query.
+ */
+ protected EndPoint endPoint = null;
+
+ /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */
+ public static class EndPoint {
+ private String ip = null;
+ private int port = 0;
+
+ public EndPoint(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ }
+
+ public EndPoint() {}
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public String toString() {
+ return "ip:port=" + ip + ":" + port;
+ }
+ }
public QueryDataSet() {}
@@ -119,4 +159,12 @@ public abstract class QueryDataSet {
public boolean hasLimit() {
return rowLimit > 0;
}
+
+ public EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
}