You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/04/18 10:20:34 UTC
[incubator-iotdb] 05/05: add Host
This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster_nodetool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d2c2b220dad1c85dbe4aaceced24781215611788
Author: mdf369 <95...@qq.com>
AuthorDate: Thu Apr 18 18:20:09 2019 +0800
add Host
---
.../cluster/entity/raft/MetadataStateManchine.java | 2 +-
.../iotdb/cluster/entity/raft/RaftService.java | 3 +
.../cluster/qp/executor/QueryMetadataExecutor.java | 16 ++---
.../iotdb/cluster/service/ClusterMonitor.java | 10 +++
.../iotdb/cluster/service/ClusterMonitorMBean.java | 4 ++
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 43 ++++++++++++
.../apache/iotdb/cluster/utils/hash/Router.java | 2 +
.../java/org/apache/iotdb/cli/service/Host.java | 76 ++++++++++++++++++++++
.../org/apache/iotdb/cli/service/NodeTool.java | 3 +-
.../cli/service/{Leader.java => StorageGroup.java} | 6 +-
.../java/org/apache/iotdb/db/metadata/MGraph.java | 2 +-
.../org/apache/iotdb/db/metadata/MManager.java | 2 +-
12 files changed, 150 insertions(+), 19 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 9592718..5b4a9e6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -148,7 +148,7 @@ public class MetadataStateManchine extends StateMachineAdapter {
mManager.setStorageLevelToMTree(sg);
}
- public Set<String> getAllStorageGroups() throws PathErrorException {
+ public Set<String> getAllStorageGroups() {
return mManager.getAllStorageGroup();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
index 1d08f09..7ef54bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
@@ -93,4 +93,7 @@ public class RaftService implements IService {
this.node = node;
}
+ public StateMachine getFsm() {
+ return fsm;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 1dfbc7e..d003181 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -331,12 +331,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
MetadataRaftHolder metadataHolder = (MetadataRaftHolder) server.getMetadataHolder();
if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryStorageGroupResponse response;
- try {
- response = QueryStorageGroupResponse
- .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
- } catch (final PathErrorException e) {
- response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
- }
+ response = QueryStorageGroupResponse
+ .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
task.run(response);
} else {
((RaftService) metadataHolder.getService()).getNode()
@@ -346,12 +342,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
public void run(Status status, long index, byte[] reqCtx) {
QueryStorageGroupResponse response;
if (status.isOk()) {
- try {
- response = QueryStorageGroupResponse
- .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
- } catch (final PathErrorException e) {
- response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
- }
+ response = QueryStorageGroupResponse
+ .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
} else {
response = QueryStorageGroupResponse.createErrorResponse(status.getErrorMsg());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
index f17beeb..f73306b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitor.java
@@ -98,4 +98,14 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
}
return builder.toString();
}
+
+ @Override
+ public Map<String[], String[]> getDataPartitonOfNode(String ip) {
+ return RaftUtils.getDataPartitionOfNode(ip);
+ }
+
+ @Override
+ public Map<String[], String[]> getDataPartitonOfNode(String ip, int port) {
+ return RaftUtils.getDataPartitionOfNode(ip, port);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
index 1f3dea7..65d4ae3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/ClusterMonitorMBean.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.cluster.service;
+import java.util.List;
import java.util.Map;
public interface ClusterMonitorMBean {
@@ -31,4 +32,7 @@ public interface ClusterMonitorMBean {
Map<String, String[]> getAllGroups();
String getDataPartitionOfSG(String sg);
+
+ Map<String[], String[]> getDataPartitonOfNode(String ip);
+ Map<String[], String[]> getDataPartitonOfNode(String ip, int port);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 978d06c..1b95ad5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -29,13 +29,19 @@ import com.alipay.sofa.jraft.util.Bits;
import com.alipay.sofa.jraft.util.OnlyForTest;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.raft.MetadataStateManchine;
import org.apache.iotdb.cluster.qp.callback.QPTask;
import org.apache.iotdb.cluster.qp.callback.SingleQPTask;
import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -50,6 +56,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
import org.apache.iotdb.cluster.utils.hash.VirtualNode;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +66,7 @@ public class RaftUtils {
private static final Server server = Server.getInstance();
private static final Router router = Router.getInstance();
private static final AtomicInteger requestId = new AtomicInteger(0);
+ private static final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
/**
* The cache will be update in two case: 1. When @onLeaderStart() method of state machine is
@@ -354,4 +362,39 @@ public class RaftUtils {
}
return nodes;
}
+
+ public static Map<String[], String[]> getDataPartitionOfNode(String ip) {
+ return getDataPartitionOfNode(ip, config.getPort());
+ }
+
+ public static Map<String[], String[]> getDataPartitionOfNode(String ip, int port) {
+ PhysicalNode[][] groups = router.getGroupsNodes(ip, port);
+
+ Map<PhysicalNode[], List<String>> groupSGMap = new LinkedHashMap<>();
+ for (int i = 0; i < groups.length; i++) {
+ groupSGMap.put(groups[i], new ArrayList<>());
+ }
+ Set<String> allSGList = ((MetadataStateManchine)((RaftService)server.getMetadataHolder().getService()).getFsm()).getAllStorageGroups();
+ for (String sg : allSGList) {
+ PhysicalNode[] tempGroup = router.routeGroup(sg);
+ if (groupSGMap.containsKey(tempGroup)) {
+ groupSGMap.get(tempGroup).add(sg);
+ }
+ }
+
+ String[][] groupIps = new String[groups.length][];
+ for (int i = 0; i < groups.length; i++) {
+ groupIps[i] = new String[groups[i].length];
+ for (int j = 0; j < groups[i].length; j++) {
+ groupIps[i][j] = groups[i][j].ip;
+ }
+ }
+
+ Map<String[], String[]> res = new HashMap<>();
+ int index = 0;
+ for (Entry<PhysicalNode[], List<String>> entry : groupSGMap.entrySet()) {
+ res.put(groupIps[index], (String[]) entry.getValue().toArray());
+ }
+ return res;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 2031676..e7fac4f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.cluster.utils.hash;
import com.alipay.sofa.jraft.util.OnlyForTest;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
diff --git a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Host.java b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Host.java
new file mode 100644
index 0000000..2093f21
--- /dev/null
+++ b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Host.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cli.service;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cli.service.NodeTool.NodeToolCmd;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.service.ClusterMonitorMBean;
+
+@Command(name = "host", description = "Print all data partitions information which specific host belongs to")
+public class Host extends NodeToolCmd {
+
+ @Option(title = "ip", name = {"-i", "--ip"}, required = true, description = "Specify the host ip for accurate hosts information")
+ private String ip = null;
+
+ @Option(title = "port", name = {"-p", "--port"}, description = "Specify the host port for accurate hosts information")
+ private int port = -1;
+
+ @Option(title = "sg_detail", name = {"-d", "--detail"}, description = "Show path of storage groups")
+ private boolean detail = false;
+
+ @Override
+ public void execute(ClusterMonitorMBean proxy) {
+ Map<String[], String[]> map;
+ if (port == -1) {
+ map = proxy.getDataPartitonOfNode(ip);
+ } else {
+ map = proxy.getDataPartitonOfNode(ip, port);
+ }
+
+ for (Entry<String[], String[]> entry : map.entrySet()) {
+ StringBuilder builder = new StringBuilder();
+ String[] ips = entry.getKey();
+ String[] sgs = entry.getValue();
+ builder.append('(');
+ for (int i = 0; i < ips.length; i++) {
+ builder.append(ips[i]).append(", ");
+ }
+ builder.delete(builder.length() - 2, builder.length());
+ builder.append(')');
+
+ builder.append("\t->\t");
+ if (detail) {
+ builder.append('(');
+ for (int i = 0; i < sgs.length; i++) {
+ builder.append(sgs[i]).append(", ");
+ }
+ builder.delete(builder.length() - 2, builder.length());
+ builder.append(')');
+ } else {
+ builder.append(sgs.length);
+ }
+
+ System.out.println(builder.toString());
+ }
+ }
+}
diff --git a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java
index 7043269..58b79f2 100644
--- a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java
+++ b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/NodeTool.java
@@ -54,7 +54,8 @@ public class NodeTool {
List<Class<? extends Runnable>> commands = newArrayList(
Help.class,
Ring.class,
- Leader.class
+ StorageGroup.class,
+ Host.class
);
Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");
diff --git a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Leader.java b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/StorageGroup.java
similarity index 89%
rename from iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Leader.java
rename to iotdb-cli/src/main/java/org/apache/iotdb/cli/service/StorageGroup.java
index 897bfa2..1c112e3 100644
--- a/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/Leader.java
+++ b/iotdb-cli/src/main/java/org/apache/iotdb/cli/service/StorageGroup.java
@@ -24,10 +24,10 @@ import org.apache.iotdb.cli.service.NodeTool.NodeToolCmd;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.service.ClusterMonitorMBean;
-@Command(name = "leader", description = "Print leader host information of specific storage group")
-public class Leader extends NodeToolCmd {
+@Command(name = "storagegroup", description = "Print all hosts information of specific storage group")
+public class StorageGroup extends NodeToolCmd {
- @Arguments(description = "Specify a storage group for accurate leader information")
+ @Arguments(description = "Specify a storage group for accurate hosts information")
private String sg = null;
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
index c3a5f5a..26e5109 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MGraph.java
@@ -244,7 +244,7 @@ public class MGraph implements Serializable {
return new Metadata(seriesMap, deviceIdMap);
}
- public HashSet<String> getAllStorageGroup() throws PathErrorException {
+ public HashSet<String> getAllStorageGroup() {
return mtree.getAllStorageGroup();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 5a752f8..af768fd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -542,7 +542,7 @@ public class MManager {
*
* @return A HashSet instance which stores all storage group info
*/
- public Set<String> getAllStorageGroup() throws PathErrorException {
+ public Set<String> getAllStorageGroup() {
lock.readLock().lock();
try {