You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/25 06:06:28 UTC

[incubator-iotdb] branch cluster_nodetool updated: add Query

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster_nodetool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_nodetool by this push:
     new 99c2866  add Query
99c2866 is described below

commit 99c28669f838b68fe8c1e5acb1fb5d5b9b6ab784
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 25 14:05:38 2019 +0800

    add Query
---
 .../raft/processor/QueryMetricAsyncProcessor.java  |  2 +-
 .../iotdb/cluster/service/ClusterMonitor.java      |  9 ++++-
 .../iotdb/cluster/service/ClusterMonitorMBean.java | 13 ++++--
 .../apache/iotdb/cluster/service/nodetool/Lag.java |  2 +-
 .../iotdb/cluster/service/nodetool/NodeTool.java   |  3 +-
 .../service/nodetool/{Lag.java => Query.java}      | 12 ++----
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 47 ++++++++++++++--------
 7 files changed, 55 insertions(+), 33 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java
index 5d4260b..a76d2a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/QueryMetricAsyncProcessor.java
@@ -32,7 +32,7 @@ public class QueryMetricAsyncProcessor extends BasicAsyncUserProcessor<QueryMetr
     String groupId = request.getGroupID();
 
     QueryMetricResponse response = QueryMetricResponse.createSuccessResponse(groupId,
-        RaftUtils.getMetric(request.getGroupID(), request.getMetric()));
+        RaftUtils.getReplicaMetric(request.getGroupID(), request.getMetric()));
     response.addResult(true);
     asyncContext.sendResponse(response);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
index 4532407..9dda178 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
@@ -100,7 +100,12 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
   }
 
   @Override
-  public Map<String, Map<String, Integer>> getLogLagMap() {
-    return RaftUtils.getLogLagMap();
+  public Map<String, Map<String, Integer>> getReplicaLagMap() {
+    return RaftUtils.getReplicaLagMap();
+  }
+
+  @Override
+  public Map<String, Integer> getQueryJobNumMap() {
+    return RaftUtils.getQueryJobNumMap();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
index dd7ec4a..d5a90c7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
@@ -70,9 +70,16 @@ public interface ClusterMonitorMBean {
   Map<String[], String[]> getDataPartitonOfNode(String ip);
 
   /**
-   * Get log lag for metadata group and each data partition
+   * Get replica lag for metadata group and each data partition
    *
-   * @return key: groupId, value: log lag
+   * @return key: groupId, value: ip -> replica lag
    */
-  Map<String, Map<String, Integer>> getLogLagMap();
+  Map<String, Map<String, Integer>> getReplicaLagMap();
+
+  /**
+   * Get number of query jobs for metadata group and each data partition
+   *
+   * @return key: groupId, value: number of query jobs
+   */
+  Map<String, Integer> getQueryJobNumMap();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
index 21d899b..67178cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
@@ -30,7 +30,7 @@ public class Lag extends NodeToolCmd {
   @Override
   public void execute(ClusterMonitorMBean proxy)
   {
-    Map<String, Map<String, Integer>> groupMap = proxy.getLogLagMap();
+    Map<String, Map<String, Integer>> groupMap = proxy.getReplicaLagMap();
     for (Entry<String, Map<String, Integer>> entry : groupMap.entrySet()) {
       System.out.println(entry.getKey() + ":");
       entry.getValue().forEach((node, lag) -> System.out.println("\t" + node + "\t->\t" + lag));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java
index cca9c87..95dcd08 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/NodeTool.java
@@ -53,7 +53,8 @@ public class NodeTool {
         Ring.class,
         StorageGroup.class,
         Host.class,
-        Lag.class
+        Lag.class,
+        Query.class
     );
 
     Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Query.java
similarity index 71%
copy from cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Query.java
index 21d899b..2a49f86 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Lag.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/nodetool/Query.java
@@ -20,20 +20,16 @@ package org.apache.iotdb.cluster.service.nodetool;
 
 import io.airlift.airline.Command;
 import java.util.Map;
-import java.util.Map.Entry;
 import org.apache.iotdb.cluster.service.nodetool.NodeTool.NodeToolCmd;
 import org.apache.iotdb.cluster.service.ClusterMonitorMBean;
 
-@Command(name = "lag", description = "Print log lag for all groups of connected host")
-public class Lag extends NodeToolCmd {
+@Command(name = "query", description = "Print number of query jobs for all groups of connected host")
+public class Query extends NodeToolCmd {
 
   @Override
   public void execute(ClusterMonitorMBean proxy)
   {
-    Map<String, Map<String, Integer>> groupMap = proxy.getLogLagMap();
-    for (Entry<String, Map<String, Integer>> entry : groupMap.entrySet()) {
-      System.out.println(entry.getKey() + ":");
-      entry.getValue().forEach((node, lag) -> System.out.println("\t" + node + "\t->\t" + lag));
-    }
+    Map<String, Integer> queryNumMap = proxy.getQueryJobNumMap();
+    queryNumMap.forEach((groupId, num) -> System.out.println(groupId + "\t->\t" + num));
   }
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 9e1fd46..32302b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -66,7 +66,6 @@ import org.apache.iotdb.cluster.rpc.raft.response.QueryMetricResponse;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.apache.iotdb.cluster.utils.hash.VirtualNode;
-import org.apache.iotdb.db.exception.ProcessorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -438,34 +437,40 @@ public class RaftUtils {
   }
 
   /**
-   * Get log lag for metadata group and each data partition
+   * Get replica lag for metadata group and each data partition
    *
-   * @return key: groupId, value: log lag
+   * @return key: groupId, value: ip -> replica lag
    */
-  public static Map<String, Map<String, Integer>> getLogLagMap() {
-    return getMetricMap("log-lags");
+  public static Map<String, Map<String, Integer>> getReplicaLagMap() {
+    return getReplicaMetricMap("log-lags");
   }
 
-  public static Map<String, Map<String, Integer>> getMetricMap(String metric) {
+  public static Map<String, Map<String, Integer>> getReplicaMetricMap(String metric) {
     Map<String, Map<String, Integer>> metricMap = new HashMap<>();
     RaftService raftService = (RaftService) server.getMetadataHolder().getService();
-    metricMap.put(raftService.getGroupId(), getMetricFromRaftService(raftService, metric));
+    metricMap.put(raftService.getGroupId(), getReplicaMetricFromRaftService(raftService, metric));
 
-    server.getDataPartitionHolderMap().forEach(
-        (k, v) -> metricMap.put(k, getMetricFromRaftService((RaftService) v.getService(), metric)));
+    router.getAllGroupId().forEach(groupId -> metricMap.put(groupId, getReplicaMetric(groupId, metric)));
     return metricMap;
   }
 
-  public static Map<String, Integer> getMetric(String groupId, String metric) {
-    RaftService service = (RaftService) server.getDataPartitionHolder(groupId).getService();
-    return getMetricFromRaftService(service, metric);
+  public static Map<String, Integer> getReplicaMetric(String groupId, String metric) {
+    if (server.getDataPartitionHolderMap().containsKey(groupId)) {
+      RaftService service = (RaftService) server.getDataPartitionHolder(groupId).getService();
+      return getReplicaMetricFromRaftService(service, metric);
+    } else {
+      LOGGER.debug("Current host does not contain group {}, all groups are {}.", groupId, server.getDataPartitionHolderMap().keySet());
+      return getReplicaMetricFromRemoteNode(groupId, metric);
+    }
   }
 
-  private static Map<String, Integer> getMetricFromRaftService(RaftService service, String metric) {
+  private static Map<String, Integer> getReplicaMetricFromRaftService(RaftService service, String metric) {
     String groupId = service.getGroupId();
+    LOGGER.debug("Get replica metric {} for group {}.", metric, service.getGroupId());
     NodeImpl node = (NodeImpl) service.getNode();
     Map<String, Integer> lagMap;
     if (node.isLeader()) {
+      LOGGER.debug("Get metric locally.");
       List<PeerId> nodes = service.getPeerIdList();
       Map<String, Gauge> metrics = service.getNode().getNodeMetrics().getMetricRegistry()
           .getGauges();
@@ -475,6 +480,7 @@ public class RaftUtils {
       for (int i = 0; i < nodes.size(); i++) {
         // leader doesn't have lag metric
         if (nodes.get(i).equals(node.getServerId())) {
+          lagMap.put(nodes.get(i).getIp() + " (leader)", 0);
           continue;
         }
 
@@ -482,22 +488,25 @@ public class RaftUtils {
         int value = -1;
         if (metric.contains(key)) {
           value = (int) metrics.get(key).getValue();
+        } else {
+          LOGGER.debug("Metric map {} should contain key {}, but not.", metrics, key);
         }
-        lagMap.put(nodes.get(i).toString(), value);
+        lagMap.put(nodes.get(i).getIp(), value);
       }
     } else {
-      lagMap = getMetricFromRemoteNode(groupId, metric);
+      lagMap = getReplicaMetricFromRemoteNode(groupId, metric);
     }
     return lagMap;
   }
 
-  private static Map<String, Integer> getMetricFromRemoteNode(String groupId, String metric) {
+  private static Map<String, Integer> getReplicaMetricFromRemoteNode(String groupId, String metric) {
     QueryMetricRequest request = new QueryMetricRequest(groupId,
         config.getReadDataConsistencyLevel(), metric);
     SingleQPTask task = new SingleQPTask(false, request);
 
     LOGGER.debug("Execute get metric for {} statement for group {}.", metric, groupId);
-    PeerId holder = RaftUtils.getRandomPeerID(groupId);
+    PeerId holder = RaftUtils.getLeaderPeerID(groupId);
+    LOGGER.debug("Get metric from node {}.", holder);
     try {
       NodeAsClient client = RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
       /** Call async method **/
@@ -514,4 +523,8 @@ public class RaftUtils {
       return null;
     }
   }
+
+  public static Map<String, Integer> getQueryJobNumMap() {
+    return null;
+  }
 }