You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/24 10:15:26 UTC

[incubator-iotdb] branch cluster_read updated: fix some bugs and add statistic

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_read by this push:
     new e94ea3c  fix some bugs and add statistic
e94ea3c is described below

commit e94ea3c62f9d4e2210dc14eeda9f2ecf075a0b90
Author: lta <li...@163.com>
AuthorDate: Wed Apr 24 18:15:10 2019 +0800

    fix some bugs and add statistic
---
 .../coordinatornode/ClusterRpcQueryManager.java    | 13 +++++++++
 .../ClusterRpcSingleQueryManager.java              | 32 ++++++++++++++++++++--
 .../coordinatornode/IClusterRpcQueryManager.java   |  5 ++++
 .../querynode/ClusterLocalQueryManager.java        | 12 ++++++++
 .../querynode/ClusterLocalSingleQueryManager.java  |  9 +++++-
 .../querynode/IClusterLocalQueryManager.java       |  5 ++++
 .../cluster/query/ClusterQueryLargeDataTest.java   |  5 ----
 7 files changed, 73 insertions(+), 8 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
index d20de8e..bd7d2d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.cluster.query.manager.coordinatornode;
 
 import com.alipay.sofa.jraft.util.OnlyForTest;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -74,6 +76,17 @@ public class ClusterRpcQueryManager implements IClusterRpcQueryManager {
     }
   }
 
+  @Override
+  public Map<String, Integer> getAllReadUsage() {
+    Map<String, Integer> readerUsageMap = new HashMap<>();
+    SINGLE_QUERY_MANAGER_MAP.values().forEach(singleQueryManager -> {
+      for(String groupId:singleQueryManager.getDataGroupUsage()) {
+        readerUsageMap.put(groupId, readerUsageMap.getOrDefault(groupId, 0) + 1);
+      }
+    });
+    return readerUsageMap;
+  }
+
   @OnlyForTest
   public static ConcurrentHashMap<Long, String> getJobIdMapTaskId() {
     return JOB_ID_MAP_TASK_ID;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index cdf4ded..fd7cfa6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -23,9 +23,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.PathType;
@@ -34,7 +36,6 @@ import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeries
 import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.cluster.query.utils.ClusterRpcReaderUtils;
 import org.apache.iotdb.cluster.query.utils.QueryPlanPartitionUtils;
-import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicQueryDataResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
@@ -54,6 +55,11 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager {
 
   /**
+   * Statistic all usage of local data group.
+   */
+  private Set<String> dataGroupUsage = new HashSet<>();
+
+  /**
    * Query job id assigned by ClusterRpcQueryManager
    */
   private String taskId;
@@ -93,7 +99,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
 
   // filter path resource
   /**
-   * Filter group entity group by data group, key is group id
+   * Filter group entity group by data group, key is group id(only contain remote group id)
    */
   private Map<String, FilterGroupEntity> filterGroupEntityMap = new HashMap<>();
 
@@ -128,6 +134,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
    */
   private void initSeriesReader(int readDataConsistencyLevel)
       throws IOException, RaftConnectionException {
+    // Init all series with data group of select series,if filter series has the same data group, init them together.
     for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
       String groupId = entry.getKey();
       QueryPlan queryPlan = entry.getValue();
@@ -147,12 +154,29 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
                 allQueryPlan, taskId, filterList);
         handleInitReaderResponse(groupId, allQueryPlan, response);
       } else {
+        dataGroupUsage.add(groupId);
         selectSeriesByGroupId.remove(groupId);
         if (filterGroupEntityMap.containsKey(groupId)) {
           filterGroupEntityMap.remove(groupId);
         }
       }
     }
+
+    //Init series reader with data groups of filter series, which don't exist in data groups list of select series.
+    for (Entry<String, FilterGroupEntity> entry : filterGroupEntityMap.entrySet()) {
+      String groupId = entry.getKey();
+      if (!selectPathPlans.containsKey(groupId)) {
+        PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
+        Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
+        FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
+        allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
+        List<Filter> filterList = filterGroupEntity.getFilters();
+        InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils
+            .createClusterSeriesReader(groupId, randomPeer, readDataConsistencyLevel,
+                allQueryPlan, taskId, filterList);
+        handleInitReaderResponse(groupId, allQueryPlan, response);
+      }
+    }
   }
 
   /**
@@ -318,6 +342,10 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
     }
   }
 
+  public Set<String> getDataGroupUsage() {
+    return dataGroupUsage;
+  }
+
   public String getTaskId() {
     return taskId;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
index c1b3506..4df2d52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.cluster.query.manager.coordinatornode;
 
+import java.util.Map;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 
@@ -61,4 +62,8 @@ public interface IClusterRpcQueryManager {
    */
   void releaseQueryResource(long jobId) throws RaftConnectionException;
 
+  /**
+   * Get all read usage count group by data group id, key is group id, value is usage count
+   */
+  Map<String, Integer> getAllReadUsage();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index fcb8a14..9cce813 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.cluster.query.manager.querynode;
 
 import com.alipay.sofa.jraft.util.OnlyForTest;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
@@ -102,6 +104,16 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
     }
   }
 
+  @Override
+  public Map<String, Integer> getAllReadUsage() {
+    Map<String, Integer> readerUsageMap = new HashMap<>();
+    SINGLE_QUERY_MANAGER_MAP.values().forEach(singleQueryManager -> {
+      String groupId = singleQueryManager.getGroupId();
+      readerUsageMap.put(groupId, readerUsageMap.getOrDefault(groupId, 0) + 1);
+    });
+    return readerUsageMap;
+  }
+
   @OnlyForTest
   public static ConcurrentHashMap<String, Long> getTaskIdMapJobId() {
     return TASK_ID_MAP_JOB_ID;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index c5ea7bf..abe0bd7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -62,6 +62,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryManager {
 
+  private String groupId;
+
   /**
    * Job id assigned by local QueryResourceManager
    */
@@ -104,7 +106,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
   @Override
   public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
       throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException {
-    InitSeriesReaderResponse response = new InitSeriesReaderResponse(request.getGroupID());
+    this.groupId = request.getGroupID();
+    InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId);
     QueryContext context = new QueryContext(jobId);
     Map<PathType, QueryPlan> queryPlanMap = request.getAllQueryPlan();
     if (queryPlanMap.containsKey(PathType.SELECT_PATH)) {
@@ -266,6 +269,10 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
     return filterReader.nextBatchList();
   }
 
+  public String getGroupId() {
+    return groupId;
+  }
+
   @Override
   public void close() throws FileNodeManagerException {
     QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
index 8e58794..cc0f103 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.cluster.query.manager.querynode;
 
 import java.io.IOException;
+import java.util.Map;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -74,4 +75,8 @@ public interface IClusterLocalQueryManager {
    */
   ClusterLocalSingleQueryManager getSingleQuery(String taskId);
 
+  /**
+   * Get all read usage count group by data group id, key is group id, value is usage count
+   */
+  Map<String, Integer> getAllReadUsage();
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
deleted file mode 100644
index 6381f25..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.iotdb.cluster.query;
-
-public class ClusterQueryLargeDataTest {
-
-}