You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/24 10:15:26 UTC
[incubator-iotdb] branch cluster_read updated: fix some bugs and
add statistic
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new e94ea3c fix some bugs and add statistic
e94ea3c is described below
commit e94ea3c62f9d4e2210dc14eeda9f2ecf075a0b90
Author: lta <li...@163.com>
AuthorDate: Wed Apr 24 18:15:10 2019 +0800
fix some bugs and add statistic
---
.../coordinatornode/ClusterRpcQueryManager.java | 13 +++++++++
.../ClusterRpcSingleQueryManager.java | 32 ++++++++++++++++++++--
.../coordinatornode/IClusterRpcQueryManager.java | 5 ++++
.../querynode/ClusterLocalQueryManager.java | 12 ++++++++
.../querynode/ClusterLocalSingleQueryManager.java | 9 +++++-
.../querynode/IClusterLocalQueryManager.java | 5 ++++
.../cluster/query/ClusterQueryLargeDataTest.java | 5 ----
7 files changed, 73 insertions(+), 8 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
index d20de8e..bd7d2d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.cluster.query.manager.coordinatornode;
import com.alipay.sofa.jraft.util.OnlyForTest;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -74,6 +76,17 @@ public class ClusterRpcQueryManager implements IClusterRpcQueryManager {
}
}
+ @Override
+ public Map<String, Integer> getAllReadUsage() {
+ Map<String, Integer> readerUsageMap = new HashMap<>();
+ SINGLE_QUERY_MANAGER_MAP.values().forEach(singleQueryManager -> {
+ for(String groupId:singleQueryManager.getDataGroupUsage()) {
+ readerUsageMap.put(groupId, readerUsageMap.getOrDefault(groupId, 0) + 1);
+ }
+ });
+ return readerUsageMap;
+ }
+
@OnlyForTest
public static ConcurrentHashMap<Long, String> getJobIdMapTaskId() {
return JOB_ID_MAP_TASK_ID;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index cdf4ded..fd7cfa6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -23,9 +23,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.PathType;
@@ -34,7 +36,6 @@ import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeries
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
import org.apache.iotdb.cluster.query.utils.ClusterRpcReaderUtils;
import org.apache.iotdb.cluster.query.utils.QueryPlanPartitionUtils;
-import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicQueryDataResponse;
import org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse;
import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
@@ -54,6 +55,11 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManager {
/**
+ * Statistic all usage of local data group.
+ */
+ private Set<String> dataGroupUsage = new HashSet<>();
+
+ /**
* Query job id assigned by ClusterRpcQueryManager
*/
private String taskId;
@@ -93,7 +99,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
// filter path resource
/**
- * Filter group entity group by data group, key is group id
+ * Filter group entity group by data group, key is group id(only contain remote group id)
*/
private Map<String, FilterGroupEntity> filterGroupEntityMap = new HashMap<>();
@@ -128,6 +134,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
*/
private void initSeriesReader(int readDataConsistencyLevel)
throws IOException, RaftConnectionException {
+ // Init all series with data group of select series,if filter series has the same data group, init them together.
for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
String groupId = entry.getKey();
QueryPlan queryPlan = entry.getValue();
@@ -147,12 +154,29 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
allQueryPlan, taskId, filterList);
handleInitReaderResponse(groupId, allQueryPlan, response);
} else {
+ dataGroupUsage.add(groupId);
selectSeriesByGroupId.remove(groupId);
if (filterGroupEntityMap.containsKey(groupId)) {
filterGroupEntityMap.remove(groupId);
}
}
}
+
+ //Init series reader with data groups of filter series, which don't exist in data groups list of select series.
+ for (Entry<String, FilterGroupEntity> entry : filterGroupEntityMap.entrySet()) {
+ String groupId = entry.getKey();
+ if (!selectPathPlans.containsKey(groupId)) {
+ PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
+ Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
+ FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
+ allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
+ List<Filter> filterList = filterGroupEntity.getFilters();
+ InitSeriesReaderResponse response = (InitSeriesReaderResponse) ClusterRpcReaderUtils
+ .createClusterSeriesReader(groupId, randomPeer, readDataConsistencyLevel,
+ allQueryPlan, taskId, filterList);
+ handleInitReaderResponse(groupId, allQueryPlan, response);
+ }
+ }
}
/**
@@ -318,6 +342,10 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
}
}
+ public Set<String> getDataGroupUsage() {
+ return dataGroupUsage;
+ }
+
public String getTaskId() {
return taskId;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
index c1b3506..4df2d52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.cluster.query.manager.coordinatornode;
+import java.util.Map;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -61,4 +62,8 @@ public interface IClusterRpcQueryManager {
*/
void releaseQueryResource(long jobId) throws RaftConnectionException;
+ /**
+ * Get all read usage count group by data group id, key is group id, value is usage count
+ */
+ Map<String, Integer> getAllReadUsage();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index fcb8a14..9cce813 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.cluster.query.manager.querynode;
import com.alipay.sofa.jraft.util.OnlyForTest;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
@@ -102,6 +104,16 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
}
}
+ @Override
+ public Map<String, Integer> getAllReadUsage() {
+ Map<String, Integer> readerUsageMap = new HashMap<>();
+ SINGLE_QUERY_MANAGER_MAP.values().forEach(singleQueryManager -> {
+ String groupId = singleQueryManager.getGroupId();
+ readerUsageMap.put(groupId, readerUsageMap.getOrDefault(groupId, 0) + 1);
+ });
+ return readerUsageMap;
+ }
+
@OnlyForTest
public static ConcurrentHashMap<String, Long> getTaskIdMapJobId() {
return TASK_ID_MAP_JOB_ID;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index c5ea7bf..abe0bd7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -62,6 +62,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryManager {
+ private String groupId;
+
/**
* Job id assigned by local QueryResourceManager
*/
@@ -104,7 +106,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException {
- InitSeriesReaderResponse response = new InitSeriesReaderResponse(request.getGroupID());
+ this.groupId = request.getGroupID();
+ InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId);
QueryContext context = new QueryContext(jobId);
Map<PathType, QueryPlan> queryPlanMap = request.getAllQueryPlan();
if (queryPlanMap.containsKey(PathType.SELECT_PATH)) {
@@ -266,6 +269,10 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
return filterReader.nextBatchList();
}
+ public String getGroupId() {
+ return groupId;
+ }
+
@Override
public void close() throws FileNodeManagerException {
QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
index 8e58794..cc0f103 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.query.manager.querynode;
import java.io.IOException;
+import java.util.Map;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -74,4 +75,8 @@ public interface IClusterLocalQueryManager {
*/
ClusterLocalSingleQueryManager getSingleQuery(String taskId);
+ /**
+ * Get all read usage count group by data group id, key is group id, value is usage count
+ */
+ Map<String, Integer> getAllReadUsage();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
deleted file mode 100644
index 6381f25..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.iotdb.cluster.query;
-
-public class ClusterQueryLargeDataTest {
-
-}