You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/03/18 05:25:57 UTC
[iotdb] 01/01: IOTDB-1241 support redirect query for cluster
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch support_redirect_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 68048a4ca1c979325b98a664ab3053d02dde3f57
Author: chaow <xu...@gmail.com>
AuthorDate: Thu Mar 18 11:54:49 2021 +0800
IOTDB-1241 support redirect query for cluster
---
.../resources/conf/iotdb-cluster.properties | 2 +
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++
.../iotdb/cluster/config/ClusterDescriptor.java | 5 ++
.../apache/iotdb/cluster/metadata/CMManager.java | 7 +-
.../cluster/query/ClusterDataQueryExecutor.java | 65 ++++++++++++++-
.../cluster/query/reader/ClusterReaderFactory.java | 36 +++++++-
.../cluster/query/reader/ClusterTimeGenerator.java | 37 +++++++--
.../cluster/query/reader/ManagedMergeReader.java | 13 +++
.../cluster/query/reader/MergedReaderByTime.java | 10 +++
.../query/ClusterDataQueryExecutorTest.java | 85 +++++++++++++++++++
.../main/java/org/apache/iotdb/SessionExample.java | 97 ++++++++++++++++++++++
.../iotdb/db/qp/physical/crud/QueryPlan.java | 10 +++
.../db/query/dataset/AlignByDeviceDataSet.java | 11 +++
.../dataset/RawQueryDataSetWithoutValueFilter.java | 11 +++
.../db/query/executor/RawDataQueryExecutor.java | 22 +++++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 52 ++++++++++--
.../org/apache/iotdb/rpc/RedirectException.java | 3 +-
.../java/org/apache/iotdb/session/Session.java | 81 +++++++++++++++++-
.../apache/iotdb/session/SessionConnection.java | 32 ++++++-
thrift/src/main/thrift/rpc.thrift | 3 +
.../tsfile/read/query/dataset/QueryDataSet.java | 40 +++++++++
21 files changed, 603 insertions(+), 29 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 1627d43..87283a1 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -166,3 +166,5 @@ max_client_pernode_permember_number=1000
# we need to wait so much time for other connections to be released until timeout,
# or a new connection will be created.
wait_client_timeout_ms=5000
+
+enable_query_redirect=false
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 08f7608..e8a3953 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -161,6 +161,8 @@ public class ClusterConfig {
private boolean openServerRpcPort = false;
+ private boolean enableQueryRedirect = false;
+
public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
@@ -456,4 +458,12 @@ public class ClusterConfig {
public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
this.waitClientTimeoutMS = waitClientTimeoutMS;
}
+
+ public boolean isEnableQueryRedirect() {
+ return enableQueryRedirect;
+ }
+
+ public void setEnableQueryRedirect(boolean enableQueryRedirect) {
+ this.enableQueryRedirect = enableQueryRedirect;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d6bbf82..799a7a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -293,6 +293,11 @@ public class ClusterDescriptor {
properties.getProperty(
"wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS()))));
+ config.setEnableQueryRedirect(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_query_redirect", String.valueOf(config.isEnableQueryRedirect()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 6320432..cd572b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1241,11 +1241,12 @@ public class CMManager extends MManager {
try {
Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
logger.debug(
- "{}: get matched paths of {} from {}, result {}",
+ "{}: get matched paths of {} from {}, result {} for {}",
metaGroupMember.getName(),
partitionGroup,
node,
- paths);
+ paths,
+ pathsToQuery);
if (paths != null) {
// query next group
Set<PartialPath> partialPaths = new HashSet<>();
@@ -1397,7 +1398,7 @@ public class CMManager extends MManager {
public Set<String> getAllDevices(List<String> paths) throws MetadataException {
Set<String> results = new HashSet<>();
for (String path : paths) {
- getAllTimeseriesPath(new PartialPath(path)).stream()
+ getDevices(new PartialPath(path)).stream()
.map(PartialPath::getFullPath)
.forEach(results::add);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
index fbf8963..d2e8737 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutor.java
@@ -23,12 +23,15 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.EmptyIntervalException;
import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
import org.apache.iotdb.cluster.query.reader.ClusterTimeGenerator;
+import org.apache.iotdb.cluster.query.reader.ManagedMergeReader;
+import org.apache.iotdb.cluster.query.reader.MergedReaderByTime;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.RawQueryDataSetWithoutValueFilter;
import org.apache.iotdb.db.query.executor.RawDataQueryExecutor;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -36,6 +39,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.slf4j.Logger;
@@ -51,6 +55,8 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
private static final Logger logger = LoggerFactory.getLogger(ClusterDataQueryExecutor.class);
private MetaGroupMember metaGroupMember;
private ClusterReaderFactory readerFactory;
+ private QueryDataSet.EndPoint endPoint = null;
+ private boolean hasLocalReader = false;
ClusterDataQueryExecutor(RawDataQueryPlan plan, MetaGroupMember metaGroupMember) {
super(plan);
@@ -74,6 +80,7 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
}
List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+ hasLocalReader = false;
for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
@@ -95,9 +102,22 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
}
readersOfSelectedSeries.add(reader);
+ if (((ManagedMergeReader) reader).getEndPoint() != null) {
+ endPoint = ((ManagedMergeReader) reader).getEndPoint();
+ } else {
+ hasLocalReader = true;
+ }
}
if (logger.isDebugEnabled()) {
- logger.debug("Initialized {} readers for {}", readersOfSelectedSeries.size(), queryPlan);
+ logger.debug(
+ "Initialized {} readers for {} has localReader {}",
+ readersOfSelectedSeries.size(),
+ queryPlan,
+ hasLocalReader);
+ }
+ if (hasLocalReader) {
+ // no need to redirect query
+ endPoint = null;
}
return readersOfSelectedSeries;
}
@@ -106,8 +126,15 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
protected IReaderByTimestamp getReaderByTimestamp(
PartialPath path, Set<String> deviceMeasurements, TSDataType dataType, QueryContext context)
throws StorageEngineException, QueryProcessException {
- return readerFactory.getReaderByTimestamp(
- path, deviceMeasurements, dataType, context, queryPlan.isAscending());
+ IReaderByTimestamp iReaderByTimestamp =
+ readerFactory.getReaderByTimestamp(
+ path, deviceMeasurements, dataType, context, queryPlan.isAscending());
+ if (((MergedReaderByTime) iReaderByTimestamp).getEndPoint() == null) {
+ hasLocalReader = true;
+ } else if (!hasLocalReader && endPoint == null) {
+ endPoint = ((MergedReaderByTime) iReaderByTimestamp).getEndPoint();
+ }
+ return iReaderByTimestamp;
}
@Override
@@ -116,4 +143,36 @@ public class ClusterDataQueryExecutor extends RawDataQueryExecutor {
throws StorageEngineException {
return new ClusterTimeGenerator(queryExpression, context, metaGroupMember, rawDataQueryPlan);
}
+
+ public QueryDataSet.EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
+
+ @Override
+ protected QueryDataSet needRedirect(long queryId, TimeGenerator timeGenerator) {
+ ClusterTimeGenerator clusterTimeGenerator = (ClusterTimeGenerator) timeGenerator;
+ logger.debug(
+ "redirect queryId {}, {}, {}, {}, {}",
+ queryId,
+ hasLocalReader,
+ queryPlan.getOperatorType(),
+ endPoint,
+ clusterTimeGenerator);
+ if (hasLocalReader
+ || !queryPlan.isEnableRedirect()
+ || (clusterTimeGenerator != null && clusterTimeGenerator.isHasLocalReader())) {
+ endPoint = null;
+ }
+ if (!hasLocalReader && endPoint != null && queryPlan.isEnableRedirect()) {
+ // dummy dataSet
+ QueryDataSet dataSet = new RawQueryDataSetWithoutValueFilter(queryId);
+ dataSet.setEndPoint(endPoint);
+ return dataSet;
+ }
+ return null;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 91819b1..a68881a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -119,14 +120,29 @@ public class ClusterReaderFactory {
path,
partitionGroups.size());
List<IReaderByTimestamp> readers = new ArrayList<>(partitionGroups.size());
+ QueryDataSet.EndPoint endPoint = new QueryDataSet.EndPoint();
+ boolean hasLocalReader = false;
for (PartitionGroup partitionGroup : partitionGroups) {
// query each group to get a reader in that group
- readers.add(
+ IReaderByTimestamp iReaderByTimestamp =
getSeriesReaderByTime(
- partitionGroup, path, deviceMeasurements, context, dataType, ascending));
+ partitionGroup, path, deviceMeasurements, context, dataType, ascending);
+ readers.add(iReaderByTimestamp);
+ if (iReaderByTimestamp instanceof SeriesReaderByTimestamp) {
+ hasLocalReader = true;
+ } else if (iReaderByTimestamp instanceof RemoteSeriesReaderByTimestamp) {
+ endPoint.setIp(partitionGroup.getHeader().getClientIp());
+ endPoint.setPort(partitionGroup.getHeader().getClientPort());
+ }
+ }
+ if (hasLocalReader) {
+ // no need redirect query to the endpoint
+ endPoint = null;
}
// merge the readers
- return new MergedReaderByTime(readers);
+ MergedReaderByTime mergedReaderByTime = new MergedReaderByTime(readers);
+ mergedReaderByTime.setEndPoint(endPoint);
+ return mergedReaderByTime;
}
/**
@@ -227,6 +243,8 @@ public class ClusterReaderFactory {
path,
partitionGroups.size());
ManagedMergeReader mergeReader = new ManagedMergeReader(dataType);
+ QueryDataSet.EndPoint endPoint = new QueryDataSet.EndPoint();
+ boolean hasLocalReader = false;
try {
// build a reader for each group and merge them
for (PartitionGroup partitionGroup : partitionGroups) {
@@ -240,11 +258,23 @@ public class ClusterReaderFactory {
context,
dataType,
ascending);
+ if (seriesReader instanceof SeriesRawDataPointReader
+ && seriesReader.hasNextTimeValuePair()) {
+ hasLocalReader = true;
+ } else if (seriesReader instanceof RemoteSimpleSeriesReader) {
+ endPoint.setIp(partitionGroup.getHeader().getClientIp());
+ endPoint.setPort(partitionGroup.getHeader().getClientPort());
+ }
mergeReader.addReader(seriesReader, 0);
}
} catch (IOException | QueryProcessException e) {
throw new StorageEngineException(e);
}
+ if (hasLocalReader) {
+ // no need redirect query to the endpoint
+ endPoint = null;
+ }
+ mergeReader.setEndPoint(endPoint);
return mergeReader;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
index b7b5f51..8f88a23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -40,6 +41,7 @@ import java.util.Collections;
public class ClusterTimeGenerator extends ServerTimeGenerator {
private ClusterReaderFactory readerFactory;
+ private boolean hasLocalReader = false;
/** Constructor of EngineTimeGenerator. */
public ClusterTimeGenerator(
@@ -65,22 +67,41 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
Filter filter = expression.getFilter();
PartialPath path = (PartialPath) expression.getSeriesPath();
TSDataType dataType;
+ ManagedSeriesReader mergeReader = null;
try {
dataType =
((CMManager) IoTDB.metaManager)
.getSeriesTypesByPaths(Collections.singletonList(path), null)
.left
.get(0);
- return readerFactory.getSeriesReader(
- path,
- queryPlan.getAllMeasurementsInDevice(path.getDevice()),
- dataType,
- null,
- filter,
- context,
- queryPlan.isAscending());
+ mergeReader =
+ readerFactory.getSeriesReader(
+ path,
+ queryPlan.getAllMeasurementsInDevice(path.getDevice()),
+ dataType,
+ null,
+ filter,
+ context,
+ queryPlan.isAscending());
} catch (Exception e) {
throw new IOException(e);
}
+ if (((ManagedMergeReader) mergeReader).getEndPoint() == null) {
+ hasLocalReader = true;
+ }
+ return mergeReader;
+ }
+
+ public boolean isHasLocalReader() {
+ return hasLocalReader;
+ }
+
+ public void setHasLocalReader(boolean hasLocalReader) {
+ this.hasLocalReader = hasLocalReader;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", has local reader:" + hasLocalReader;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
index e54dede..acaffe4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ManagedMergeReader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
import java.util.NoSuchElementException;
@@ -38,6 +39,10 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
private BatchData batchData;
private TSDataType dataType;
+ /*
+ * whether need to redirect node to the node
+ */
+ private QueryDataSet.EndPoint endPoint = null;
public ManagedMergeReader(TSDataType dataType) {
this.dataType = dataType;
@@ -91,4 +96,12 @@ public class ManagedMergeReader extends PriorityMergeReader implements ManagedSe
batchData = null;
return ret;
}
+
+ public QueryDataSet.EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
index 2db6a09..ae215f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/MergedReaderByTime.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.query.reader;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
import java.util.List;
@@ -27,6 +28,7 @@ import java.util.List;
public class MergedReaderByTime implements IReaderByTimestamp {
private List<IReaderByTimestamp> innerReaders;
+ private QueryDataSet.EndPoint endPoint = null;
public MergedReaderByTime(List<IReaderByTimestamp> innerReaders) {
this.innerReaders = innerReaders;
@@ -44,4 +46,12 @@ public class MergedReaderByTime implements IReaderByTimestamp {
}
return null;
}
+
+ public QueryDataSet.EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(QueryDataSet.EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index aa48624..bee1d3a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.query;
import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -27,18 +28,40 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
public class ClusterDataQueryExecutorTest extends BaseQueryTest {
private ClusterDataQueryExecutor queryExecutor;
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false);
+ }
+
@Test
public void testNoFilter() throws IOException, StorageEngineException, QueryProcessException {
RawDataQueryPlan plan = new RawDataQueryPlan();
@@ -75,4 +98,66 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
QueryResourceManager.getInstance().endQuery(context.getQueryId());
}
}
+
+ @Test
+ public void testNoFilterWithRedirect()
+ throws IOException, StorageEngineException, QueryProcessException {
+ RawDataQueryPlan plan = new RawDataQueryPlan();
+ plan.setDeduplicatedPaths(pathList);
+ plan.setDeduplicatedDataTypes(dataTypes);
+ plan.setEnableRedirect(true);
+ queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+ System.out.println(dataSet.getEndPoint());
+ assertNull(dataSet.getEndPoint());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+
+ @Test
+ public void testFilterWithValueFilterRedirect()
+ throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
+ IExpression expression =
+ new SingleSeriesExpression(
+ new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0));
+ RawDataQueryPlan plan = new RawDataQueryPlan();
+ plan.setDeduplicatedPaths(pathList);
+ plan.setDeduplicatedDataTypes(dataTypes);
+ plan.setExpression(expression);
+ plan.setEnableRedirect(true);
+ queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithValueFilter(context);
+ assertNull(dataSet.getEndPoint());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+
+ @Test
+ public void testFilterWithTimeFilterRedirect()
+ throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
+ IExpression expression =
+ new GlobalTimeExpression(new AndFilter(TimeFilter.gtEq(5), TimeFilter.ltEq(10)));
+ RawDataQueryPlan plan = new RawDataQueryPlan();
+ plan.setDeduplicatedPaths(pathList.subList(0, 1));
+ plan.setDeduplicatedDataTypes(dataTypes.subList(0, 1));
+ plan.setExpression(expression);
+ plan.setEnableRedirect(true);
+ queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
+ RemoteQueryContext context =
+ new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+ try {
+ QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+ assertEquals("ip:port=0.0.0.0:6667", dataSet.getEndPoint().toString());
+ } finally {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 20387de..9868e26 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -39,6 +39,7 @@ import java.util.Random;
public class SessionExample {
private static Session session;
+ private static Session sessionEnableRedirect;
private static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
private static final String ROOT_SG1_D1_S2 = "root.sg1.d1.s2";
private static final String ROOT_SG1_D1_S3 = "root.sg1.d1.s3";
@@ -77,6 +78,17 @@ public class SessionExample {
deleteTimeseries();
setTimeout();
session.close();
+
+ sessionEnableRedirect = new Session("127.0.0.1", 6667, "root", "root");
+ sessionEnableRedirect.setEnableQueryRedirection(true);
+ sessionEnableRedirect.open(false);
+
+ // set session fetchSize
+ sessionEnableRedirect.setFetchSize(10000);
+
+ insertRecord4Redirect();
+ query4Redirect();
+ sessionEnableRedirect.close();
}
private static void createTimeseries()
@@ -193,6 +205,31 @@ public class SessionExample {
}
}
+ private static void insertRecord4Redirect()
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < 6; i++) {
+ for (int j = 0; j < 2; j++) {
+ String deviceId = "root.redirect" + i + ".d" + j;
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ for (long time = 0; time < 5; time++) {
+ List<Object> values = new ArrayList<>();
+ values.add(1L + time);
+ values.add(2L + time);
+ values.add(3L + time);
+ session.insertRecord(deviceId, time, measurements, types, values);
+ }
+ }
+ }
+ }
+
private static void insertStrRecord()
throws IoTDBConnectionException, StatementExecutionException {
String deviceId = ROOT_SG1_D1;
@@ -448,6 +485,66 @@ public class SessionExample {
dataSet.closeOperationHandle();
}
+ private static void query4Redirect()
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement("select * from root.redirect" + i + ".d1");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(
+ "select * from root.redirect" + i + ".d1 where time >= 1 and time < 10");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(
+ "select * from root.redirect"
+ + i
+ + ".d1 where time >= 1 and time < 10 align by device");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ for (int i = 0; i < 6; i++) {
+ SessionDataSet dataSet =
+ sessionEnableRedirect.executeQueryStatement(
+ "select * from root.redirect"
+ + i
+ + ".d1 where time >= 1 and time < 10 and root.redirect"
+ + i
+ + "d1.s1 > 1");
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+ }
+
private static void queryWithTimeout()
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index bcd5b05..a188db5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -42,6 +42,8 @@ public abstract class QueryPlan extends PhysicalPlan {
private Map<String, Integer> pathToIndex = new HashMap<>();
+ private boolean enableRedirect = false;
+
public QueryPlan() {
super(true);
setOperatorType(Operator.OperatorType.QUERY);
@@ -126,4 +128,12 @@ public abstract class QueryPlan extends PhysicalPlan {
throws IllegalPathException {
return columnForReader;
}
+
+ public boolean isEnableRedirect() {
+ return enableRedirect;
+ }
+
+ public void setEnableRedirect(boolean enableRedirect) {
+ this.enableRedirect = enableRedirect;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index f8ce3ea..b37a16e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
@@ -108,6 +109,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
this.dataSetType = DataSetType.QUERY;
this.rawDataQueryPlan = new RawDataQueryPlan();
this.rawDataQueryPlan.setAscending(alignByDevicePlan.isAscending());
+ // only redirect query for raw data query
+ this.rawDataQueryPlan.setEnableRedirect(alignByDevicePlan.isEnableRedirect());
}
this.curDataSetInitialized = false;
@@ -198,6 +201,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
throw new IOException(e);
}
+ if (currentDataSet.getEndPoint() != null) {
+ org.apache.iotdb.service.rpc.thrift.EndPoint endPoint =
+ new org.apache.iotdb.service.rpc.thrift.EndPoint();
+ endPoint.setIp(currentDataSet.getEndPoint().getIp());
+ endPoint.setPort(currentDataSet.getEndPoint().getPort());
+ throw new RedirectException(endPoint);
+ }
+
if (currentDataSet.hasNext()) {
curDataSetInitialized = true;
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 63f08c1..a707063 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -180,6 +180,17 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet
init();
}
+ /**
+ * dummy dataSet for redirect query
+ *
+ * @param queryId queryId for the query
+ */
+ public RawQueryDataSetWithoutValueFilter(long queryId) {
+ this.queryId = queryId;
+ blockingQueueArray = new BlockingQueue[0];
+ timeHeap = new TimeSelector(0, ascending);
+ }
+
private void init() throws IOException, InterruptedException {
timeHeap = new TimeSelector(seriesReaderList.size() << 1, ascending);
for (int i = 0; i < seriesReaderList.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 042934a..cb76e20 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -62,6 +62,10 @@ public class RawDataQueryExecutor {
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException, QueryProcessException {
List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
+ QueryDataSet dataSet = needRedirect(context.getQueryId(), null);
+ if (dataSet != null) {
+ return dataSet;
+ }
try {
return new RawQueryDataSetWithoutValueFilter(
context.getQueryId(),
@@ -80,6 +84,10 @@ public class RawDataQueryExecutor {
public final QueryDataSet executeNonAlign(QueryContext context)
throws StorageEngineException, QueryProcessException {
List<ManagedSeriesReader> readersOfSelectedSeries = initManagedSeriesReader(context);
+ QueryDataSet dataSet = needRedirect(context.getQueryId(), null);
+ if (dataSet != null) {
+ return dataSet;
+ }
return new NonAlignEngineDataSet(
context.getQueryId(),
queryPlan.getDeduplicatedPaths(),
@@ -142,6 +150,10 @@ public class RawDataQueryExecutor {
timestampGenerator.hasOrNode());
List<IReaderByTimestamp> readersOfSelectedSeries =
initSeriesReaderByTimestamp(context, queryPlan, cached);
+ QueryDataSet dataSet = needRedirect(context.getQueryId(), timestampGenerator);
+ if (dataSet != null) {
+ return dataSet;
+ }
return new RawQueryDataSetWithValueFilter(
queryPlan.getDeduplicatedPaths(),
queryPlan.getDeduplicatedDataTypes(),
@@ -196,4 +208,14 @@ public class RawDataQueryExecutor {
throws StorageEngineException {
return new ServerTimeGenerator(expression, context, queryPlan);
}
+
+ /**
+ * check whether need to redirect query to other node
+ *
+ * @param queryId queryId to cancel query
+ * @return dummyDataSet to avoid more cost, if null, no need
+ */
+ protected QueryDataSet needRedirect(long queryId, TimeGenerator timeGenerator) {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 031b2bc..95119c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -83,8 +83,10 @@ import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -531,7 +533,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()))
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
: executeUpdateStatement(physicalPlan, req.getSessionId());
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
@@ -557,7 +560,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
physicalPlan,
req.fetchSize,
req.timeout,
- sessionIdUsernameMap.get(req.getSessionId()))
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
} catch (Exception e) {
@@ -582,7 +586,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
physicalPlan,
req.fetchSize,
config.getQueryTimeoutThreshold(),
- sessionIdUsernameMap.get(req.getSessionId()))
+ sessionIdUsernameMap.get(req.getSessionId()),
+ req.isEnableRedirectQuery())
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
} catch (Exception e) {
@@ -602,7 +607,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan plan,
int fetchSize,
long timeout,
- String username)
+ String username,
+ boolean enableRedirect)
throws QueryProcessException, SQLException, StorageEngineException,
QueryFilterOptimizationException, MetadataException, IOException, InterruptedException,
TException, AuthException {
@@ -644,8 +650,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (plan instanceof QueryPlan && !(plan instanceof UDFPlan)) {
resp = getQueryColumnHeaders(plan, username);
}
+ if (plan instanceof QueryPlan) {
+ ((QueryPlan) plan).setEnableRedirect(enableRedirect);
+ }
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
+
+ if (newDataSet.getEndPoint() != null && enableRedirect) {
+ // redirect query
+ LOGGER.debug(
+ "need to redirect {} {} to node {}", statement, queryId, newDataSet.getEndPoint());
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(
+ new EndPoint(newDataSet.getEndPoint().getIp(), newDataSet.getEndPoint().getPort()));
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(queryId);
+ return resp;
+ }
+
if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
resp = getListDataSetHeaders(newDataSet);
} else if (plan instanceof UDFPlan) {
@@ -662,7 +685,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (newDataSet instanceof DirectNonAlignDataSet) {
resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
} else {
- resp.setQueryDataSet(fillRpcReturnData(fetchSize, newDataSet, username));
+ try {
+ TSQueryDataSet tsQueryDataSet = fillRpcReturnData(fetchSize, newDataSet, username);
+ resp.setQueryDataSet(tsQueryDataSet);
+ } catch (RedirectException e) {
+ LOGGER.debug("need to redirect {} {} to {}", statement, queryId, e.getEndPoint());
+ if (enableRedirect) {
+ // redirect query
+ TSStatus status = new TSStatus();
+ status.setRedirectNode(e.getEndPoint());
+ status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ resp.setStatus(status);
+ resp.setQueryId(queryId);
+ return resp;
+ } else {
+ LOGGER.error(
+ "execute {} error, if session not support redirect, should not throw redirection exception",
+ statement,
+ e);
+ }
+ }
}
resp.setQueryId(queryId);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index e84a96a..15f3157 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.rpc;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import java.io.IOException;
import java.util.Map;
-public class RedirectException extends Exception {
+public class RedirectException extends IOException {
private final EndPoint endPoint;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index b873ade..c3db5e4 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -94,6 +94,8 @@ public class Session {
protected Map<EndPoint, SessionConnection> endPointToSessionConnection;
private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>();
+ protected boolean enableQueryRedirection = false;
+
public Session(String host, int rpcPort) {
this(
host,
@@ -254,6 +256,7 @@ public class Session {
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (enableCacheLeader) {
@@ -452,7 +455,29 @@ public class Session {
*/
public SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
- return defaultSessionConnection.executeQueryStatement(sql, timeout);
+ try {
+ logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
+ return defaultSessionConnection.executeQueryStatement(sql, timeout);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ logger.debug(
+ "{} redirect query {} to {}",
+ defaultSessionConnection.getEndPoint(),
+ sql,
+ e.getEndPoint());
+ // retry
+ try {
+ return defaultSessionConnection.executeQueryStatement(sql, timeout);
+ } catch (RedirectException redirectException) {
+ logger.error("{} redirect twice", sql, redirectException);
+ throw new StatementExecutionException(sql + " redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(
+ "raw data query do not support redirect, please confirm the session and server conf.");
+ }
+ }
}
/**
@@ -467,7 +492,24 @@ public class Session {
if (timeoutInMs < 0) {
throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
}
- return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+ try {
+ return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ logger.debug("redirect query {} to {}", sql, e.getEndPoint());
+ // retry
+ try {
+ return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
+ } catch (RedirectException redirectException) {
+ logger.error("{} redirect twice", sql, redirectException);
+ throw new StatementExecutionException(sql + " redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(
+ "raw data query do not support redirect, please confirm the session and server conf.");
+ }
+ }
}
/**
@@ -493,7 +535,24 @@ public class Session {
*/
public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
throws StatementExecutionException, IoTDBConnectionException {
- return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ try {
+ return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ } catch (RedirectException e) {
+ handleQueryRedirection(e.getEndPoint());
+ if (enableQueryRedirection) {
+ logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+ // retry
+ try {
+ return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
+ } catch (RedirectException redirectException) {
+ logger.error("Redirect twice", redirectException);
+ throw new StatementExecutionException("Redirect twice, please try again.");
+ }
+ } else {
+ throw new StatementExecutionException(
+ "raw data query do not support redirect, please confirm the session and server conf.");
+ }
+ }
}
/**
@@ -585,6 +644,14 @@ public class Session {
}
}
+ private void handleQueryRedirection(EndPoint endPoint) throws IoTDBConnectionException {
+ if (enableQueryRedirection) {
+ defaultSessionConnection.close();
+ defaultSessionConnection = constructSessionConnection(this, endPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ }
+ }
+
/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
@@ -1454,4 +1521,12 @@ public class Session {
throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
}
}
+
+ public boolean isEnableQueryRedirection() {
+ return enableQueryRedirection;
+ }
+
+ public void setEnableQueryRedirection(boolean enableQueryRedirection) {
+ this.enableQueryRedirection = enableQueryRedirection;
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 7a43483..f2b6665 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -69,6 +69,7 @@ public class SessionConnection {
private long statementId;
private ZoneId zoneId;
private EndPoint endPoint;
+ private boolean enableRedirect = false;
// TestOnly
public SessionConnection() {}
@@ -254,7 +255,11 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = null;
try {
- dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+ try {
+ dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
+ } catch (RedirectException e) {
+ throw new StatementExecutionException("need to redirect query, should not see this.", e);
+ }
return dataSet.hasNext();
} finally {
if (dataSet != null) {
@@ -264,13 +269,15 @@ public class SessionConnection {
}
protected SessionDataSet executeQueryStatement(String sql, long timeout)
- throws StatementExecutionException, IoTDBConnectionException {
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeout);
TSExecuteStatementResp execResp;
try {
+ execReq.setEnableRedirectQuery(enableRedirect);
execResp = client.executeQueryStatement(execReq);
+ RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
@@ -304,6 +311,7 @@ public class SessionConnection {
throws IoTDBConnectionException, StatementExecutionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
try {
+ execReq.setEnableRedirectQuery(enableRedirect);
TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
RpcUtils.verifySuccess(execResp.getStatus());
} catch (TException e) {
@@ -322,13 +330,15 @@ public class SessionConnection {
}
protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
- throws StatementExecutionException, IoTDBConnectionException {
+ throws StatementExecutionException, IoTDBConnectionException, RedirectException {
TSRawDataQueryReq execReq =
new TSRawDataQueryReq(sessionId, paths, startTime, endTime, statementId);
execReq.setFetchSize(session.fetchSize);
TSExecuteStatementResp execResp;
try {
+ execReq.setEnableRedirectQuery(enableRedirect);
execResp = client.executeRawDataQuery(execReq);
+ RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
} catch (TException e) {
if (reconnect()) {
try {
@@ -660,4 +670,20 @@ public class SessionConnection {
}
return flag;
}
+
+ public boolean isEnableRedirect() {
+ return enableRedirect;
+ }
+
+ public void setEnableRedirect(boolean enableRedirect) {
+ this.enableRedirect = enableRedirect;
+ }
+
+ public EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
}
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 83cb70c..94892fa 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -117,6 +117,8 @@ struct TSExecuteStatementReq {
4: optional i32 fetchSize
5: optional i64 timeout
+
+ 6: optional bool enableRedirectQuery;
}
struct TSExecuteBatchStatementReq{
@@ -276,6 +278,7 @@ struct TSRawDataQueryReq {
4: required i64 startTime
5: required i64 endTime
6: required i64 statementId
+ 7: optional bool enableRedirectQuery;
}
struct TSCreateMultiTimeseriesReq {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index eb7a206..4f25739 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -34,6 +34,38 @@ public abstract class QueryDataSet {
protected int rowOffset = 0;
protected int alreadyReturnedRowNum = 0;
protected boolean ascending;
+ /*
+ * whether current data group has data for query.
+ * If not null(must be in cluster mode), we need to redirect the query to any data group which has some data to speed up query.
+ */
+ protected EndPoint endPoint = null;
+
+ /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */
+ public static class EndPoint {
+ private String ip = null;
+ private int port = 0;
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public String toString() {
+ return "ip:port=" + ip + ":" + port;
+ }
+ }
public QueryDataSet() {}
@@ -114,4 +146,12 @@ public abstract class QueryDataSet {
public boolean hasLimit() {
return rowLimit > 0;
}
+
+ public EndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ public void setEndPoint(EndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
}