You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/18 10:20:34 UTC

[incubator-iotdb] 05/05: add Host

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster_nodetool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d2c2b220dad1c85dbe4aaceced24781215611788
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 18 18:20:09 2019 +0800

    add Host
---
 .../cluster/entity/raft/MetadataStateManchine.java |  2 +-
 .../iotdb/cluster/entity/raft/RaftService.java     |  3 +
 .../cluster/qp/executor/QueryMetadataExecutor.java | 16 ++---
 .../iotdb/cluster/service/ClusterMonitor.java      | 10 +++
 .../iotdb/cluster/service/ClusterMonitorMBean.java |  4 ++
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 43 ++++++++++++
 .../apache/iotdb/cluster/utils/hash/Router.java    |  2 +
 .../java/org/apache/iotdb/cli/service/Host.java    | 76 ++++++++++++++++++++++
 .../org/apache/iotdb/cli/service/NodeTool.java     |  3 +-
 .../cli/service/{Leader.java => StorageGroup.java} |  6 +-
 .../java/org/apache/iotdb/db/metadata/MGraph.java  |  2 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  2 +-
 12 files changed, 150 insertions(+), 19 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 9592718..5b4a9e6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -148,7 +148,7 @@ public class MetadataStateManchine extends StateMachineAdapter {
     mManager.setStorageLevelToMTree(sg);
   }
 
-  public Set<String> getAllStorageGroups() throws PathErrorException {
+  public Set<String> getAllStorageGroups() {
     return mManager.getAllStorageGroup();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
index 1d08f09..7ef54bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
@@ -93,4 +93,7 @@ public class RaftService implements IService {
     this.node = node;
   }
 
+  public StateMachine getFsm() {
+    return fsm;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 1dfbc7e..d003181 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -331,12 +331,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     MetadataRaftHolder metadataHolder = (MetadataRaftHolder) server.getMetadataHolder();
     if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryStorageGroupResponse response;
-      try {
-        response = QueryStorageGroupResponse
-            .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
-      } catch (final PathErrorException e) {
-        response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
-      }
+      response = QueryStorageGroupResponse
+          .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
       task.run(response);
     } else {
       ((RaftService) metadataHolder.getService()).getNode()
@@ -346,12 +342,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
             public void run(Status status, long index, byte[] reqCtx) {
               QueryStorageGroupResponse response;
               if (status.isOk()) {
-                try {
-                  response = QueryStorageGroupResponse
-                      .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
-                } catch (final PathErrorException e) {
-                  response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
-                }
+                response = QueryStorageGroupResponse
+                    .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
               } else {
                 response = QueryStorageGroupResponse.createErrorResponse(status.getErrorMsg());
               }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
index f17beeb..f73306b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
@@ -98,4 +98,14 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
     }
     return builder.toString();
   }
+
+  @Override
+  public Map<String[], String[]> getDataPartitonOfNode(String ip) {
+    return RaftUtils.getDataPartitionOfNode(ip);
+  }
+
+  @Override
+  public Map<String[], String[]> getDataPartitonOfNode(String ip, int port) {
+    return RaftUtils.getDataPartitionOfNode(ip, port);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
index 1f3dea7..65d4ae3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.cluster.service;
 
+import java.util.List;
 import java.util.Map;
 
 public interface ClusterMonitorMBean {
@@ -31,4 +32,7 @@ public interface ClusterMonitorMBean {
   Map<String, String[]> getAllGroups();
 
   String getDataPartitionOfSG(String sg);
+
+  Map<String[], String[]> getDataPartitonOfNode(String ip);
+  Map<String[], String[]> getDataPartitonOfNode(String ip, int port);
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 978d06c..1b95ad5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -29,13 +29,19 @@ import com.alipay.sofa.jraft.util.Bits;
 import com.alipay.sofa.jraft.util.OnlyForTest;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.raft.MetadataStateManchine;
 import org.apache.iotdb.cluster.qp.callback.QPTask;
 import org.apache.iotdb.cluster.qp.callback.SingleQPTask;
 import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -50,6 +56,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.apache.iotdb.cluster.utils.hash.VirtualNode;
+import org.apache.iotdb.db.exception.PathErrorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +66,7 @@ public class RaftUtils {
   private static final Server server = Server.getInstance();
   private static final Router router = Router.getInstance();
   private static final AtomicInteger requestId = new AtomicInteger(0);
+  private static final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
 
   /**
    * The cache will be update in two case: 1. When @onLeaderStart() method of state machine is
@@ -354,4 +362,39 @@ public class RaftUtils {
     }
     return nodes;
   }
+
+  public static Map<String[], String[]> getDataPartitionOfNode(String ip) {
+    return getDataPartitionOfNode(ip, config.getPort());
+  }
+
+  public static Map<String[], String[]> getDataPartitionOfNode(String ip, int port) {
+    PhysicalNode[][] groups = router.getGroupsNodes(ip, port);
+
+    Map<PhysicalNode[], List<String>> groupSGMap = new LinkedHashMap<>();
+    for (int i = 0; i < groups.length; i++) {
+      groupSGMap.put(groups[i], new ArrayList<>());
+    }
+    Set<String> allSGList = ((MetadataStateManchine)((RaftService)server.getMetadataHolder().getService()).getFsm()).getAllStorageGroups();
+    for (String sg : allSGList) {
+      PhysicalNode[] tempGroup = router.routeGroup(sg);
+      if (groupSGMap.containsKey(tempGroup)) {
+        groupSGMap.get(tempGroup).add(sg);
+      }
+    }
+
+    String[][] groupIps = new String[groups.length][];
+    for (int i = 0; i < groups.length; i++) {
+      groupIps[i] = new String[groups[i].length];
+      for (int j = 0; j < groups[i].length; j++) {
+        groupIps[i][j] = groups[i][j].ip;
+      }
+    }
+
+    Map<String[], String[]> res = new HashMap<>();
+    int index = 0;
+    for (Entry<PhysicalNode[], List<String>> entry : groupSGMap.entrySet()) {
+      res.put(groupIps[index], (String[]) entry.getValue().toArray());
+    }
+    return res;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 2031676..e7fac4f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -19,8 +19,10 @@
 package org.apache.iotdb.cluster.utils.hash;
 
 import com.alipay.sofa.jraft.util.OnlyForTest;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
diff --git a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Host.java b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Host.java
new file mode 100644
index 0000000..2093f21
--- /dev/null
+++ b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Host.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cli.service;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cli.service.NodeTool.NodeToolCmd;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.service.ClusterMonitorMBean;
+
+@Command(name = "host", description = "Print all data partitions information which specific host belongs to")
+public class Host extends NodeToolCmd {
+
+  @Option(title = "ip", name = {"-i", "--ip"}, required = true, description = "Specify the host ip for accurate hosts information")
+  private String ip = null;
+
+  @Option(title = "port", name = {"-p", "--port"}, description = "Specify the host port for accurate hosts information")
+  private int port = -1;
+
+  @Option(title = "sg_detail", name = {"-d", "--detail"}, description = "Show path of storage groups")
+  private boolean detail = false;
+
+  @Override
+  public void execute(ClusterMonitorMBean proxy) {
+    Map<String[], String[]> map;
+    if (port == -1) {
+      map = proxy.getDataPartitonOfNode(ip);
+    } else {
+      map = proxy.getDataPartitonOfNode(ip, port);
+    }
+
+    for (Entry<String[], String[]> entry : map.entrySet()) {
+      StringBuilder builder = new StringBuilder();
+      String[] ips = entry.getKey();
+      String[] sgs = entry.getValue();
+      builder.append('(');
+      for (int i = 0; i < ips.length; i++) {
+        builder.append(ips[i]).append(", ");
+      }
+      builder.delete(builder.length() - 2, builder.length());
+      builder.append(')');
+
+      builder.append("\t->\t");
+      if (detail) {
+        builder.append('(');
+        for (int i = 0; i < sgs.length; i++) {
+          builder.append(sgs[i]).append(", ");
+        }
+        builder.delete(builder.length() - 2, builder.length());
+        builder.append(')');
+      } else {
+        builder.append(sgs.length);
+      }
+
+      System.out.println(builder.toString());
+    }
+  }
+}
diff --git a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java
index 7043269..58b79f2 100644
--- a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java
+++ b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java
@@ -54,7 +54,8 @@ public class NodeTool {
     List<Class<? extends Runnable>> commands = newArrayList(
         Help.class,
         Ring.class,
-        Leader.class
+        StorageGroup.class,
+        Host.class
     );
 
     Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");
diff --git a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Leader.java b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/StorageGroup.java
similarity index 89%
rename from iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Leader.java
rename to iotdb-cli/src/main/java/org/apache/iotdb/cli/service/StorageGroup.java
index 897bfa2..1c112e3 100644
--- a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Leader.java
+++ b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/StorageGroup.java
@@ -24,10 +24,10 @@ import org.apache.iotdb.cli.service.NodeTool.NodeToolCmd;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.service.ClusterMonitorMBean;
 
-@Command(name = "leader", description = "Print leader host information of specific storage group")
-public class Leader extends NodeToolCmd {
+@Command(name = "storagegroup", description = "Print all hosts information of specific storage group")
+public class StorageGroup extends NodeToolCmd {
 
-  @Arguments(description = "Specify a storage group for accurate leader information")
+  @Arguments(description = "Specify a storage group for accurate hosts information")
   private String sg = null;
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
index c3a5f5a..26e5109 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
@@ -244,7 +244,7 @@ public class MGraph implements Serializable {
     return new Metadata(seriesMap, deviceIdMap);
   }
 
-  public HashSet<String> getAllStorageGroup() throws PathErrorException {
+  public HashSet<String> getAllStorageGroup() {
     return mtree.getAllStorageGroup();
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 5a752f8..af768fd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -542,7 +542,7 @@ public class MManager {
    *
    * @return A HashSet instance which stores all storage group info
    */
-  public Set<String> getAllStorageGroup() throws PathErrorException {
+  public Set<String> getAllStorageGroup() {
 
     lock.readLock().lock();
     try {