You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/19 13:26:55 UTC

[incubator-iotdb] branch cluster_read updated: fix issue of remote query finish

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_read by this push:
     new 833e277  fix issue of remote query finish
833e277 is described below

commit 833e277bd8b78a2630ec90eb61079e3c6005f2c1
Author: lta <li...@163.com>
AuthorDate: Fri Apr 19 21:26:36 2019 +0800

    fix issue of remote query finish
---
 .../ClusterRpcSingleQueryManager.java                | 20 ++++++++++++++++++--
 .../coordinatornode/ClusterSelectSeriesReader.java   |  6 ++----
 2 files changed, 20 insertions(+), 6 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 95f18b1..10c6e02 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.QueryType;
@@ -225,7 +226,21 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
       QuerySeriesDataByTimestampResponse response = ClusterRpcReaderUtils
           .fetchBatchDataByTimestamp(groupId, queryNodes.get(groupId), taskId, queryRounds++,
               batchTimestamp);
-      handleFetchDataResponseForSelectPaths(fetchDataFilterSeries, response);
+      handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, response);
+    }
+  }
+
+  /**
+   * Handle response of fetching data, and add batch data to corresponding reader.
+   */
+  private void handleFetchDataByTimestampResponseForSelectPaths(List<String> fetchDataSeries,
+      BasicQueryDataResponse response) {
+    List<BatchData> batchDataList = response.getSeriesBatchData();
+    for (int i = 0; i < fetchDataSeries.size(); i++) {
+      String series = fetchDataSeries.get(i);
+      BatchData batchData = batchDataList.get(i);
+      selectSeriesReaders.get(new Path(series))
+          .addBatchData(batchData, true);
     }
   }
 
@@ -238,7 +253,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
     for (int i = 0; i < fetchDataSeries.size(); i++) {
       String series = fetchDataSeries.get(i);
       BatchData batchData = batchDataList.get(i);
-      selectSeriesReaders.get(new Path(series)).addBatchData(batchData);
+      selectSeriesReaders.get(new Path(series))
+          .addBatchData(batchData, batchData.length() < ClusterConstant.BATCH_READ_SIZE);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
index a949034..e26a051 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/ClusterSelectSeriesReader.java
@@ -142,11 +142,9 @@ public class ClusterSelectSeriesReader extends AbstractClusterPointReader implem
     this.currentBatchData = currentBatchData;
   }
 
-  public void addBatchData(BatchData batchData) {
+  public void addBatchData(BatchData batchData, boolean remoteDataFinish) {
     batchDataList.addLast(batchData);
-    if (batchData.length() < ClusterConstant.BATCH_READ_SIZE) {
-      remoteDataFinish = true;
-    }
+    this.remoteDataFinish = remoteDataFinish;
   }
 
   public boolean isRemoteDataFinish() {