You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/19 13:26:55 UTC
[incubator-iotdb] branch cluster_read updated: fix issue of remote
query finish
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new 833e277 fix issue of remote query finish
833e277 is described below
commit 833e277bd8b78a2630ec90eb61079e3c6005f2c1
Author: lta <li...@163.com>
AuthorDate: Fri Apr 19 21:26:36 2019 +0800
fix issue of remote query finish
---
.../ClusterRpcSingleQueryManager.java | 20 ++++++++++++++++++--
.../coordinatornode/ClusterSelectSeriesReader.java | 6 ++----
2 files changed, 20 insertions(+), 6 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 95f18b1..10c6e02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.QueryType;
@@ -225,7 +226,21 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
QuerySeriesDataByTimestampResponse response = ClusterRpcReaderUtils
.fetchBatchDataByTimestamp(groupId, queryNodes.get(groupId), taskId, queryRounds++,
batchTimestamp);
- handleFetchDataResponseForSelectPaths(fetchDataFilterSeries, response);
+ handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, response);
+ }
+ }
+
+ /**
+ * Handle response of fetching data, and add batch data to corresponding reader.
+ */
+ private void handleFetchDataByTimestampResponseForSelectPaths(List<String> fetchDataSeries,
+ BasicQueryDataResponse response) {
+ List<BatchData> batchDataList = response.getSeriesBatchData();
+ for (int i = 0; i < fetchDataSeries.size(); i++) {
+ String series = fetchDataSeries.get(i);
+ BatchData batchData = batchDataList.get(i);
+ selectSeriesReaders.get(new Path(series))
+ .addBatchData(batchData, true);
}
}
@@ -238,7 +253,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
for (int i = 0; i < fetchDataSeries.size(); i++) {
String series = fetchDataSeries.get(i);
BatchData batchData = batchDataList.get(i);
- selectSeriesReaders.get(new Path(series)).addBatchData(batchData);
+ selectSeriesReaders.get(new Path(series))
+ .addBatchData(batchData, batchData.length() < ClusterConstant.BATCH_READ_SIZE);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
index a949034..e26a051 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
@@ -142,11 +142,9 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem
this.currentBatchData = currentBatchData;
}
- public void addBatchData(BatchData batchData) {
+ public void addBatchData(BatchData batchData, boolean remoteDataFinish) {
batchDataList.addLast(batchData);
- if (batchData.length() < ClusterConstant.BATCH_READ_SIZE) {
- remoteDataFinish = true;
- }
+ this.remoteDataFinish = remoteDataFinish;
}
public boolean isRemoteDataFinish() {