You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/25 06:06:28 UTC
[incubator-iotdb] branch cluster_nodetool updated: add Query
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster_nodetool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_nodetool by this push:
new 99c2866 add Query
99c2866 is described below
commit 99c28669f838b68fe8c1e5acb1fb5d5b9b6ab784
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 25 14:05:38 2019 +0800
add Query
---
.../raft/processor/QueryMetricAsyncProcessor.java | 2 +-
.../iotdb/cluster/service/ClusterMonitor.java | 9 ++++-
.../iotdb/cluster/service/ClusterMonitorMBean.java | 13 ++++--
.../apache/iotdb/cluster/service/nodetool/Lag.java | 2 +-
.../iotdb/cluster/service/nodetool/NodeTool.java | 3 +-
.../service/nodetool/{Lag.java => Query.java} | 12 ++----
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 47 ++++++++++++++--------
7 files changed, 55 insertions(+), 33 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java
index 5d4260b..a76d2a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java
@@ -32,7 +32,7 @@ public class QueryMetricAsyncProcessor extends BasicAsyncUserProcessor<QueryMetr
String groupId = request.getGroupID();
QueryMetricResponse response = QueryMetricResponse.createSuccessResponse(groupId,
- RaftUtils.getMetric(request.getGroupID(), request.getMetric()));
+ RaftUtils.getReplicaMetric(request.getGroupID(), request.getMetric()));
response.addResult(true);
asyncContext.sendResponse(response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
index 4532407..9dda178 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
@@ -100,7 +100,12 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
}
@Override
- public Map<String, Map<String, Integer>> getLogLagMap() {
- return RaftUtils.getLogLagMap();
+ public Map<String, Map<String, Integer>> getReplicaLagMap() {
+ return RaftUtils.getReplicaLagMap();
+ }
+
+ @Override
+ public Map<String, Integer> getQueryJobNumMap() {
+ return RaftUtils.getQueryJobNumMap();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
index dd7ec4a..d5a90c7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
@@ -70,9 +70,16 @@ public interface ClusterMonitorMBean {
Map<String[], String[]> getDataPartitonOfNode(String ip);
/**
- * Get log lag for metadata group and each data partition
+ * Get replica lag for metadata group and each data partition
*
- * @return key: groupId, value: log lag
+ * @return key: groupId, value: ip -> replica lag
*/
- Map<String, Map<String, Integer>> getLogLagMap();
+ Map<String, Map<String, Integer>> getReplicaLagMap();
+
+ /**
+ * Get number of query jobs for metadata group and each data partition
+ *
+ * @return key: groupId, value: number of query jobs
+ */
+ Map<String, Integer> getQueryJobNumMap();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
index 21d899b..67178cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
@@ -30,7 +30,7 @@ public class Lag extends NodeToolCmd {
@Override
public void execute(ClusterMonitorMBean proxy)
{
- Map<String, Map<String, Integer>> groupMap = proxy.getLogLagMap();
+ Map<String, Map<String, Integer>> groupMap = proxy.getReplicaLagMap();
for (Entry<String, Map<String, Integer>> entry : groupMap.entrySet()) {
System.out.println(entry.getKey() + ":");
entry.getValue().forEach((node, lag) -> System.out.println("\t" + node + "\t->\t" + lag));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java
index cca9c87..95dcd08 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java
@@ -53,7 +53,8 @@ public class NodeTool {
Ring.class,
StorageGroup.class,
Host.class,
- Lag.class
+ Lag.class,
+ Query.class
);
Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Query.java
similarity index 71%
copy from cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Query.java
index 21d899b..2a49f86 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Query.java
@@ -20,20 +20,16 @@ package org.apache.iotdb.cluster.service.nodetool;
import io.airlift.airline.Command;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.iotdb.cluster.service.nodetool.NodeTool.NodeToolCmd;
import org.apache.iotdb.cluster.service.ClusterMonitorMBean;
-@Command(name = "lag", description = "Print log lag for all groups of connected host")
-public class Lag extends NodeToolCmd {
+@Command(name = "query", description = "Print number of query jobs for all groups of connected host")
+public class Query extends NodeToolCmd {
@Override
public void execute(ClusterMonitorMBean proxy)
{
- Map<String, Map<String, Integer>> groupMap = proxy.getLogLagMap();
- for (Entry<String, Map<String, Integer>> entry : groupMap.entrySet()) {
- System.out.println(entry.getKey() + ":");
- entry.getValue().forEach((node, lag) -> System.out.println("\t" + node + "\t->\t" + lag));
- }
+ Map<String, Integer> queryNumMap = proxy.getQueryJobNumMap();
+ queryNumMap.forEach((groupId, num) -> System.out.println(groupId + "\t->\t" + num));
}
}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 9e1fd46..32302b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -66,7 +66,6 @@ import org.apache.iotdb.cluster.rpc.raft.response.QueryMetricResponse;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
import org.apache.iotdb.cluster.utils.hash.VirtualNode;
-import org.apache.iotdb.db.exception.ProcessorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -438,34 +437,40 @@ public class RaftUtils {
}
/**
- * Get log lag for metadata group and each data partition
+ * Get replica lag for metadata group and each data partition
*
- * @return key: groupId, value: log lag
+ * @return key: groupId, value: ip -> replica lag
*/
- public static Map<String, Map<String, Integer>> getLogLagMap() {
- return getMetricMap("log-lags");
+ public static Map<String, Map<String, Integer>> getReplicaLagMap() {
+ return getReplicaMetricMap("log-lags");
}
- public static Map<String, Map<String, Integer>> getMetricMap(String metric) {
+ public static Map<String, Map<String, Integer>> getReplicaMetricMap(String metric) {
Map<String, Map<String, Integer>> metricMap = new HashMap<>();
RaftService raftService = (RaftService) server.getMetadataHolder().getService();
- metricMap.put(raftService.getGroupId(), getMetricFromRaftService(raftService, metric));
+ metricMap.put(raftService.getGroupId(), getReplicaMetricFromRaftService(raftService, metric));
- server.getDataPartitionHolderMap().forEach(
- (k, v) -> metricMap.put(k, getMetricFromRaftService((RaftService) v.getService(), metric)));
+ router.getAllGroupId().forEach(groupId -> metricMap.put(groupId, getReplicaMetric(groupId, metric)));
return metricMap;
}
- public static Map<String, Integer> getMetric(String groupId, String metric) {
- RaftService service = (RaftService) server.getDataPartitionHolder(groupId).getService();
- return getMetricFromRaftService(service, metric);
+ public static Map<String, Integer> getReplicaMetric(String groupId, String metric) {
+ if (server.getDataPartitionHolderMap().containsKey(groupId)) {
+ RaftService service = (RaftService) server.getDataPartitionHolder(groupId).getService();
+ return getReplicaMetricFromRaftService(service, metric);
+ } else {
+ LOGGER.debug("Current host does not contain group {}, all groups are {}.", groupId, server.getDataPartitionHolderMap().keySet());
+ return getReplicaMetricFromRemoteNode(groupId, metric);
+ }
}
- private static Map<String, Integer> getMetricFromRaftService(RaftService service, String metric) {
+ private static Map<String, Integer> getReplicaMetricFromRaftService(RaftService service, String metric) {
String groupId = service.getGroupId();
+ LOGGER.debug("Get replica metric {} for group {}.", metric, service.getGroupId());
NodeImpl node = (NodeImpl) service.getNode();
Map<String, Integer> lagMap;
if (node.isLeader()) {
+ LOGGER.debug("Get metric locally.");
List<PeerId> nodes = service.getPeerIdList();
Map<String, Gauge> metrics = service.getNode().getNodeMetrics().getMetricRegistry()
.getGauges();
@@ -475,6 +480,7 @@ public class RaftUtils {
for (int i = 0; i < nodes.size(); i++) {
// leader doesn't have lag metric
if (nodes.get(i).equals(node.getServerId())) {
+ lagMap.put(nodes.get(i).getIp() + " (leader)", 0);
continue;
}
@@ -482,22 +488,25 @@ public class RaftUtils {
int value = -1;
if (metric.contains(key)) {
value = (int) metrics.get(key).getValue();
+ } else {
+ LOGGER.debug("Metric map {} should contain key {}, but not.", metrics, key);
}
- lagMap.put(nodes.get(i).toString(), value);
+ lagMap.put(nodes.get(i).getIp(), value);
}
} else {
- lagMap = getMetricFromRemoteNode(groupId, metric);
+ lagMap = getReplicaMetricFromRemoteNode(groupId, metric);
}
return lagMap;
}
- private static Map<String, Integer> getMetricFromRemoteNode(String groupId, String metric) {
+ private static Map<String, Integer> getReplicaMetricFromRemoteNode(String groupId, String metric) {
QueryMetricRequest request = new QueryMetricRequest(groupId,
config.getReadDataConsistencyLevel(), metric);
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.debug("Execute get metric for {} statement for group {}.", metric, groupId);
- PeerId holder = RaftUtils.getRandomPeerID(groupId);
+ PeerId holder = RaftUtils.getLeaderPeerID(groupId);
+ LOGGER.debug("Get metric from node {}.", holder);
try {
NodeAsClient client = RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
/** Call async method **/
@@ -514,4 +523,8 @@ public class RaftUtils {
return null;
}
}
+
+ public static Map<String, Integer> getQueryJobNumMap() {
+ return null;
+ }
}