You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/12 02:00:21 UTC
git commit: [s4-110] Fixing Tools to optionally support Helix
Updated Branches:
refs/heads/S4-110-new 0d2f441a0 -> 9690dd120
[s4-110] Fixing Tools to optionally support Helix
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/9690dd12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/9690dd12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/9690dd12
Branch: refs/heads/S4-110-new
Commit: 9690dd120a68013635fdd9c29bd1620bd71bb940
Parents: 0d2f441
Author: kishoreg <ki...@apache.org>
Authored: Mon Feb 11 18:00:10 2013 -0800
Committer: kishoreg <ki...@apache.org>
Committed: Mon Feb 11 18:00:10 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/s4/core/BaseModule.java | 9 +-
.../java/org/apache/s4/core/DefaultCoreModule.java | 2 -
.../main/java/org/apache/s4/core/S4Bootstrap.java | 2 +-
.../org/apache/s4/deploy/DeploymentManager.java | 1 -
.../java/org/apache/s4/deploy/DeploymentUtils.java | 2 +-
.../s4/deploy/HelixBasedDeploymentManager.java | 2 +-
.../src/main/java/org/apache/s4/tools/Tools.java | 45 +-
.../org/apache/s4/tools/helix/ClusterStatus.java | 470 +++++++++++++++
.../java/org/apache/s4/tools/helix/S4Status.java | 468 --------------
9 files changed, 513 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 7f0a539..6062305 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -17,6 +17,9 @@ import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
import org.apache.s4.deploy.AppStateModelFactory;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.deploy.HelixBasedDeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,12 +53,13 @@ public class BaseModule extends AbstractModule {
// share the Zookeeper connection
bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
- if (config.getBoolean("s4.helix")) {
+ String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
+ if (config.getBoolean("s4.helix") || "HELIX".equalsIgnoreCase(clusterManager)) {
bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
bind(Cluster.class).to(ClusterFromHelix.class);
bind(TaskStateModelFactory.class);
bind(AppStateModelFactory.class).in(Scopes.SINGLETON);
- // bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+ bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
bind(Bootstrap.class).to(S4HelixBootstrap.class);
@@ -66,6 +70,7 @@ public class BaseModule extends AbstractModule {
// it is eager so that the node is able to join a cluster immediately
bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
bind(Cluster.class).to(ClusterFromZK.class);
+ bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
bind(Bootstrap.class).to(S4Bootstrap.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index a7407e3..10cbbe5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -87,8 +87,6 @@ public class DefaultCoreModule extends AbstractModule {
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
- bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
-
bind(S4RLoaderFactory.class);
// For enabling checkpointing, one needs to use a custom module, such as
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index b3a5f3d..6063009 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -210,7 +210,7 @@ public class S4Bootstrap implements Bootstrap {
if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
logger.info("S4 node in standby until app class or app URI is specified");
}
- Server server = injector.getInstance(Server.class);
+ Server server = injector.getInstance(Server .class);
server.setInjector(injector);
DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
deploymentManager.deploy(appConfig);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
index 1b8994e..23b5d3d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
@@ -26,7 +26,6 @@ import org.apache.s4.core.util.AppConfig;
*
*/
public interface DeploymentManager {
- public static final String S4R_URI = "s4r_uri";
void start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
index fc42754..6d7de32 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
@@ -57,7 +57,7 @@ public class DeploymentUtils {
return;
}
}
- logger.info("{} value not set for {} : no application code will be downloaded", DeploymentManager.S4R_URI, appConfig.getAppName());
+ logger.info("{} value not set for {} : no application code will be downloaded", AppConfig.APP_URI, appConfig.getAppName());
return;
}
try {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index c5ba6ed..62a16a3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -30,7 +30,7 @@ public class HelixBasedDeploymentManager implements DeploymentManager {
@Override
public void deploy(AppConfig appConfig) throws DeploymentFailedException {
- DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+ //DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index d25f7a7..11e2cb5 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
@@ -32,7 +33,7 @@ import org.apache.s4.tools.helix.CreateTask;
import org.apache.s4.tools.helix.DeployApp;
import org.apache.s4.tools.helix.GenericEventAdapter;
import org.apache.s4.tools.helix.RebalanceTask;
-import org.apache.s4.tools.helix.S4Status;
+import org.apache.s4.tools.helix.ClusterStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,24 +47,43 @@ public class Tools {
static Logger logger = LoggerFactory.getLogger(Tools.class);
enum Task {
- //deploy(Deploy.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(
- // null), newApp(CreateApp.class), s4r(Package.class), status(Status.class);
-
- deployApp(DeployApp.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(CreateCluster.class), genericAdapter(
- GenericEventAdapter.class), newApp(CreateApp.class), s4r(Package.class), status(S4Status.class),
- addNodes(AddNodes.class),createTask(
- CreateTask.class), rebalanceTask(RebalanceTask.class);
+ //formatter:off
+ zkServer(ZKServer.class),
+ newApp(CreateApp.class),
+ node(S4Node.class),
+ s4r(Package.class),
+ status(Status.class, ClusterStatus.class),
+ deploy(Deploy.class, DeployApp.class),
+ newCluster(DefineCluster.class, CreateCluster.class),
+ adapter(null),
+ genericAdapter(null,GenericEventAdapter.class),
+ addNodes(null,AddNodes.class),
+ createTask(null,CreateTask.class),
+ rebalanceTask(null,RebalanceTask.class);
+ //formatter:on
+
+ Class<?> zkTarget;
+ private Class<?> helixTarget;
- Class<?> target;
-
Task(Class<?> target) {
- this.target = target;
+ this(target,target);
+ }
+ Task(Class<?> zkTarget, Class<?> helixTarget) {
+ this.zkTarget = zkTarget;
+ this.helixTarget = helixTarget;
}
public void dispatch(String[] args) {
try {
+ String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
+ Class<?> target = zkTarget;
+ if("HELIX".equalsIgnoreCase(clusterManager)){
+ target= helixTarget;
+ }
+
Method main = target.getMethod("main", String[].class);
main.invoke(null, new Object[] { args });
+
} catch (Exception e) {
e.printStackTrace();
logger.error("Cannot dispatch to task [{}]: wrong arguments [{}]", this.name(), Arrays.toString(args));
@@ -71,11 +91,12 @@ public class Tools {
}
}
-
+
public static void main(String[] args) {
// configure log4j for Zookeeper
BasicConfigurator.configure();
+ org.apache.log4j.Logger.getLogger("org.apache.helix").setLevel(Level.ERROR);
org.apache.log4j.Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
org.apache.log4j.Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
new file mode 100644
index 0000000..425e4fc
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.helix;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Maps;
+
+public class ClusterStatus extends S4ArgsBase {
+ static Logger logger = LoggerFactory.getLogger(ClusterStatus.class);
+
+ private static String NONE = "--";
+
+ public static void main(String[] args) {
+
+ StatusArgs statusArgs = new StatusArgs();
+ Tools.parseArgs(statusArgs, args);
+
+ try {
+ if (statusArgs.clusters.size() > 0) {
+ for (String cluster : statusArgs.clusters) {
+ HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster, "ADMIN",
+ InstanceType.ADMINISTRATOR, statusArgs.zkConnectionString);
+ manager.connect();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ printClusterInfo(manager, cluster);
+ List<String> resourcesInCluster = manager.getClusterManagmentTool().getResourcesInCluster(cluster);
+
+ List<String> apps = new ArrayList<String>();
+ List<String> tasks = new ArrayList<String>();
+
+ for (String resource : resourcesInCluster) {
+ ConfigScope scope = builder.forCluster(cluster).forResource(resource).build();
+ String resourceType = configAccessor.get(scope, "type");
+ if ("App".equals(resourceType)) {
+ apps.add(resource);
+ } else if ("Task".equals(resourceType)) {
+ tasks.add(resource);
+ }
+ }
+ if (statusArgs.apps == null && statusArgs.streams == null) {
+ statusArgs.apps = apps;
+ statusArgs.streams = tasks;
+ }
+ for (String app : statusArgs.apps) {
+ if (resourcesInCluster.contains(app)) {
+ printAppInfo(manager, cluster, app);
+ }
+ }
+
+ if (statusArgs.streams != null && statusArgs.streams.size() > 0) {
+ for (String stream : statusArgs.streams) {
+ if (resourcesInCluster.contains(stream)) {
+ printStreamInfo(manager, cluster, stream);
+ }
+ }
+ }
+ manager.disconnect();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Cannot get the status of S4", e);
+ }
+
+ }
+
+ private static void printStreamInfo(HelixManager manager, String cluster, String taskId) {
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
+ String streamName = configAccessor.get(scope, "streamName");
+ String taskType = configAccessor.get(scope, "taskType");
+
+ System.out.println("Task Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
+ inMiddle("Description", 90));
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20), inMiddle(cluster, 20),
+ inMiddle(streamName + " " + taskType, 90));
+ System.out.println(generateEdge(130));
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
+ ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+ List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+ System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
+ System.out.println(generateEdge(130));
+ for (String partition : assignment.getPartitionSet()) {
+ Map<String, String> stateMap = view.getStateMap(partition);
+ StringBuilder sb = new StringBuilder();
+ String delim = "";
+ for (String instance : stateMap.keySet()) {
+ sb.append(delim);
+ String state = stateMap.get(instance);
+ if (liveInstances.contains(instance)) {
+ sb.append(instance).append(":").append(state);
+ } else {
+ sb.append(instance).append(":").append("OFFLINE");
+ }
+ delim = ", ";
+ }
+ System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
+ }
+ System.out.println(generateEdge(130));
+ }
+
+ private static void printAppInfo(HelixManager manager, String cluster, String app) {
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
+ String uri = configAccessor.get(scope, AppConfig.APP_URI);
+
+ System.out.println("App Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
+ System.out.println(generateEdge(130));
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
+ ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+ List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+ Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
+ Map<String, String> appStateMap = view.getStateMap(app);
+ System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
+ System.out.println(generateEdge(130));
+ for (String instance : assignmentMap.keySet()) {
+ String state = appStateMap.get(instance);
+ System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
+ inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
+ }
+
+ System.out.println(generateEdge(130));
+
+ }
+
+ private static void printClusterInfo(HelixManager manager, String cluster) {
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = dataAccessor.keyBuilder();
+ List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
+ List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
+ if (liveInstances == null) {
+ liveInstances = Collections.emptyList();
+ }
+ System.out.println("Cluster Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
+ inMiddle("Active", 10), generateEdge(80));
+ System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
+ inMiddle("Port", 8), inMiddle("Active", 10));
+ System.out.println(generateEdge(130));
+
+ System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
+ inMiddle("" + liveInstances.size(), 8));
+ boolean first = true;
+
+ for (String instance : instances) {
+ InstanceConfig config = dataAccessor.getProperty(keyBuilder.instanceConfig(instance));
+ // System.out.println(config);
+ if (first) {
+ first = false;
+ } else {
+ System.out.format("%n%-50s", " ");
+ }
+ System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
+ inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
+ inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
+ }
+
+ System.out.println();
+ }
+
+ @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
+ static class StatusArgs extends S4ArgsBase {
+
+ @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
+ List<String> apps;
+
+ @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
+ List<String> clusters;
+
+ @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
+ List<String> streams;
+
+ @Parameter(names = "-zk", description = "ZooKeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
+ int timeout = 10000;
+ }
+
+ private static void showAppsStatus(List<Cluster> clusters) {
+ System.out.println("App Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+ System.out.println(generateEdge(130));
+ for (Cluster cluster : clusters) {
+ if (!NONE.equals(cluster.app.name)) {
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+ inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+ }
+ }
+ System.out.println(generateEdge(130));
+
+ }
+
+ private static void showClustersStatus(List<Cluster> clusters) {
+ System.out.println("Cluster Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
+ generateEdge(80));
+ System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
+ inMiddle("Host", 50), inMiddle("Port", 8));
+ System.out.println(generateEdge(130));
+
+ for (Cluster cluster : clusters) {
+ System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+ inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+ inMiddle("" + cluster.nodes.size(), 8));
+ boolean first = true;
+ for (ClusterNode node : cluster.nodes) {
+ if (first) {
+ first = false;
+ } else {
+ System.out.format("%n%-60s", " ");
+ }
+ System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+ inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+ }
+ System.out.println();
+ }
+ System.out.println(generateEdge(130));
+ }
+
+ private static void showStreamsStatus(List<Stream> streams) {
+ System.out.println("Stream Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
+ inMiddle("Consumers", 55));
+ System.out.println(generateEdge(130));
+
+ for (Stream stream : streams) {
+ System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+ inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+ inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
+ }
+ System.out.println(generateEdge(130));
+
+ }
+
+ private static String inMiddle(String content, int width) {
+ int i = (width - content.length()) / 2;
+ return String.format("%" + i + "s%s", " ", content);
+ }
+
+ private static String generateEdge(int length) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ sb.append("-");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * show as cluster1(app1), cluster2(app2)
+ *
+ * @param clusters
+ * cluster list
+ * @param clusterAppMap
+ * <cluster,app>
+ * @return
+ */
+ private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+ if (clusters == null || clusters.size() == 0) {
+ return NONE;
+ } else {
+ // show as: cluster1(app1), cluster2(app2)
+ StringBuilder sb = new StringBuilder();
+ for (String cluster : clusters) {
+ String app = clusterAppMap.get(cluster);
+ sb.append(cluster);
+ if (!NONE.equals(app)) {
+ sb.append("(").append(app).append(")");
+ }
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+ }
+
+ static class Stream {
+
+ private final ZkClient zkClient;
+ private final String consumerPath;
+ private final String producerPath;
+
+ String streamName;
+ Set<String> producers = new HashSet<String>();// cluster name
+ Set<String> consumers = new HashSet<String>();// cluster name
+
+ Map<String, String> clusterAppMap = Maps.newHashMap();
+
+ public Stream(String streamName, ZkClient zkClient) throws Exception {
+ this.streamName = streamName;
+ this.zkClient = zkClient;
+ this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+ this.producerPath = "/s4/streams/" + streamName + "/producers";
+ readStreamFromZk();
+ }
+
+ private void readStreamFromZk() throws Exception {
+ List<String> consumerNodes = zkClient.getChildren(consumerPath);
+ for (String node : consumerNodes) {
+ ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+ consumers.add(consumer.getSimpleField("clusterName"));
+ }
+
+ List<String> producerNodes = zkClient.getChildren(producerPath);
+ for (String node : producerNodes) {
+ ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+ producers.add(consumer.getSimpleField("clusterName"));
+ }
+
+ getAppNames();
+ }
+
+ private void getAppNames() {
+ Set<String> clusters = new HashSet<String>(consumers);
+ clusters.addAll(producers);
+ for (String cluster : clusters) {
+ clusterAppMap.put(cluster, getApp(cluster, zkClient));
+ }
+ }
+
+ public boolean containsCluster(String cluster) {
+ if (producers.contains(cluster) || consumers.contains(cluster)) {
+ return true;
+ }
+ return false;
+ }
+
+ private static String getApp(String clusterName, ZkClient zkClient) {
+ String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ if (zkClient.exists(appPath)) {
+ ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+ return appRecord.getSimpleField("name");
+ }
+ return NONE;
+ }
+ }
+
+ static class App {
+ private String name = NONE;
+ private String cluster;
+ private String uri = NONE;
+ }
+
+ static class Cluster {
+ private final ZkClient zkClient;
+ private final String taskPath;
+ private final String processPath;
+ private final String appPath;
+
+ String clusterName;
+ int taskNumber;
+ App app;
+
+ List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+ public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+ this.clusterName = clusterName;
+ this.zkClient = zkClient;
+ this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+ this.processPath = "/s4/clusters/" + clusterName + "/process";
+ this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ readClusterFromZk();
+ }
+
+ public void readClusterFromZk() throws Exception {
+ List<String> processes;
+ List<String> tasks;
+
+ tasks = zkClient.getChildren(taskPath);
+ processes = zkClient.getChildren(processPath);
+
+ taskNumber = tasks.size();
+
+ for (int i = 0; i < processes.size(); i++) {
+ ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+ if (process != null) {
+ int partition = Integer.parseInt(process.getSimpleField("partition"));
+ String host = process.getSimpleField("host");
+ int port = Integer.parseInt(process.getSimpleField("port"));
+ String taskId = process.getSimpleField("taskId");
+ ClusterNode node = new ClusterNode(partition, port, host, taskId);
+ nodes.add(node);
+ }
+ }
+
+ app = new App();
+ app.cluster = clusterName;
+ try {
+ ZNRecord appRecord = zkClient.readData(appPath);
+ app.name = appRecord.getSimpleField("name");
+ app.uri = appRecord.getSimpleField("s4r_uri");
+ } catch (ZkNoNodeException e) {
+ logger.warn(appPath + " doesn't exist");
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/9690dd12/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java
deleted file mode 100644
index 079fcff..0000000
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.tools.helix;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigScope;
-import org.apache.helix.ConfigScopeBuilder;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
-import org.apache.s4.tools.S4ArgsBase;
-import org.apache.s4.tools.Tools;
-import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.collect.Maps;
-
-public class S4Status extends S4ArgsBase {
- static Logger logger = LoggerFactory.getLogger(S4Status.class);
-
- private static String NONE = "--";
-
- public static void main(String[] args) {
-
- StatusArgs statusArgs = new StatusArgs();
- Tools.parseArgs(statusArgs, args);
-
- try {
- if (statusArgs.clusters.size() > 0) {
- for (String cluster : statusArgs.clusters) {
- HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster, "ADMIN",
- InstanceType.ADMINISTRATOR, statusArgs.zkConnectionString);
- manager.connect();
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- printClusterInfo(manager, cluster);
- List<String> resourcesInCluster = manager.getClusterManagmentTool().getResourcesInCluster(cluster);
-
- List<String> apps = new ArrayList<String>();
- List<String> tasks = new ArrayList<String>();
-
- for (String resource : resourcesInCluster) {
- ConfigScope scope = builder.forCluster(cluster).forResource(resource).build();
- String resourceType = configAccessor.get(scope, "type");
- if ("App".equals(resourceType)) {
- apps.add(resource);
- } else if ("Task".equals(resourceType)) {
- tasks.add(resource);
- }
- }
- if (statusArgs.apps == null && statusArgs.streams == null) {
- statusArgs.apps = apps;
- statusArgs.streams = tasks;
- }
- for (String app : statusArgs.apps) {
- if (resourcesInCluster.contains(app)) {
- printAppInfo(manager, cluster, app);
- }
- }
-
- if (statusArgs.streams != null && statusArgs.streams.size() > 0) {
- for (String stream : statusArgs.streams) {
- if (resourcesInCluster.contains(stream)) {
- printStreamInfo(manager, cluster, stream);
- }
- }
- }
- manager.disconnect();
- }
- }
- } catch (Exception e) {
- logger.error("Cannot get the status of S4", e);
- }
-
- }
-
- private static void printStreamInfo(HelixManager manager, String cluster, String taskId) {
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
- String streamName = configAccessor.get(scope, "streamName");
- String taskType = configAccessor.get(scope, "taskType");
-
- System.out.println("Task Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
- inMiddle("Description", 90));
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20), inMiddle(cluster, 20),
- inMiddle(streamName + " " + taskType, 90));
- System.out.println(generateEdge(130));
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
- ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
- List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
- System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
- System.out.println(generateEdge(130));
- for (String partition : assignment.getPartitionSet()) {
- Map<String, String> stateMap = view.getStateMap(partition);
- StringBuilder sb = new StringBuilder();
- String delim = "";
- for (String instance : stateMap.keySet()) {
- sb.append(delim);
- String state = stateMap.get(instance);
- if (liveInstances.contains(instance)) {
- sb.append(instance).append(":").append(state);
- } else {
- sb.append(instance).append(":").append("OFFLINE");
- }
- delim = ", ";
- }
- System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
- }
- System.out.println(generateEdge(130));
- }
-
- private static void printAppInfo(HelixManager manager, String cluster, String app) {
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
- String uri = configAccessor.get(scope, "s4r_uri");
-
- System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
- System.out.println(generateEdge(130));
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
- ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
- List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
- Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
- Map<String, String> appStateMap = view.getStateMap(app);
- System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
- System.out.println(generateEdge(130));
- for (String instance : assignmentMap.keySet()) {
- String state = appStateMap.get(instance);
- System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
- inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
- }
-
- System.out.println(generateEdge(130));
-
- }
-
- private static void printClusterInfo(HelixManager manager, String cluster) {
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = dataAccessor.keyBuilder();
- List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
- List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
- if (liveInstances == null) {
- liveInstances = Collections.emptyList();
- }
- System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
- inMiddle("Active", 10), generateEdge(80));
- System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
- inMiddle("Port", 8), inMiddle("Active", 10));
- System.out.println(generateEdge(130));
-
- System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
- inMiddle("" + liveInstances.size(), 8));
- boolean first = true;
-
- for (String instance : instances) {
- InstanceConfig config = dataAccessor.getProperty(keyBuilder.instanceConfig(instance));
- // System.out.println(config);
- if (first) {
- first = false;
- } else {
- System.out.format("%n%-50s", " ");
- }
- System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
- inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
- inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
- }
-
- System.out.println();
- }
-
- @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
- static class StatusArgs extends S4ArgsBase {
-
- @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
- List<String> apps;
-
- @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
- List<String> clusters;
-
- @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
- List<String> streams;
-
- @Parameter(names = "-zk", description = "ZooKeeper connection string")
- String zkConnectionString = "localhost:2181";
-
- @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
- int timeout = 10000;
- }
-
- private static void showAppsStatus(List<Cluster> clusters) {
- System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
- for (Cluster cluster : clusters) {
- if (!NONE.equals(cluster.app.name)) {
- System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
- inMiddle(cluster.app.cluster, 20), cluster.app.uri);
- }
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static void showClustersStatus(List<Cluster> clusters) {
- System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
- generateEdge(80));
- System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
- inMiddle("Host", 50), inMiddle("Port", 8));
- System.out.println(generateEdge(130));
-
- for (Cluster cluster : clusters) {
- System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
- inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
- inMiddle("" + cluster.nodes.size(), 8));
- boolean first = true;
- for (ClusterNode node : cluster.nodes) {
- if (first) {
- first = false;
- } else {
- System.out.format("%n%-60s", " ");
- }
- System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
- inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
- }
- System.out.println();
- }
- System.out.println(generateEdge(130));
- }
-
- private static void showStreamsStatus(List<Stream> streams) {
- System.out.println("Stream Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
- inMiddle("Consumers", 55));
- System.out.println(generateEdge(130));
-
- for (Stream stream : streams) {
- System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
- inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
- inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static String inMiddle(String content, int width) {
- int i = (width - content.length()) / 2;
- return String.format("%" + i + "s%s", " ", content);
- }
-
- private static String generateEdge(int length) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++) {
- sb.append("-");
- }
- return sb.toString();
- }
-
- /**
- * show as cluster1(app1), cluster2(app2)
- *
- * @param clusters
- * cluster list
- * @param clusterAppMap
- * <cluster,app>
- * @return
- */
- private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
- if (clusters == null || clusters.size() == 0) {
- return NONE;
- } else {
- // show as: cluster1(app1), cluster2(app2)
- StringBuilder sb = new StringBuilder();
- for (String cluster : clusters) {
- String app = clusterAppMap.get(cluster);
- sb.append(cluster);
- if (!NONE.equals(app)) {
- sb.append("(").append(app).append(")");
- }
- sb.append(" ");
- }
- return sb.toString();
- }
- }
-
- static class Stream {
-
- private final ZkClient zkClient;
- private final String consumerPath;
- private final String producerPath;
-
- String streamName;
- Set<String> producers = new HashSet<String>();// cluster name
- Set<String> consumers = new HashSet<String>();// cluster name
-
- Map<String, String> clusterAppMap = Maps.newHashMap();
-
- public Stream(String streamName, ZkClient zkClient) throws Exception {
- this.streamName = streamName;
- this.zkClient = zkClient;
- this.consumerPath = "/s4/streams/" + streamName + "/consumers";
- this.producerPath = "/s4/streams/" + streamName + "/producers";
- readStreamFromZk();
- }
-
- private void readStreamFromZk() throws Exception {
- List<String> consumerNodes = zkClient.getChildren(consumerPath);
- for (String node : consumerNodes) {
- ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
- consumers.add(consumer.getSimpleField("clusterName"));
- }
-
- List<String> producerNodes = zkClient.getChildren(producerPath);
- for (String node : producerNodes) {
- ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
- producers.add(consumer.getSimpleField("clusterName"));
- }
-
- getAppNames();
- }
-
- private void getAppNames() {
- Set<String> clusters = new HashSet<String>(consumers);
- clusters.addAll(producers);
- for (String cluster : clusters) {
- clusterAppMap.put(cluster, getApp(cluster, zkClient));
- }
- }
-
- public boolean containsCluster(String cluster) {
- if (producers.contains(cluster) || consumers.contains(cluster)) {
- return true;
- }
- return false;
- }
-
- private static String getApp(String clusterName, ZkClient zkClient) {
- String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
- if (zkClient.exists(appPath)) {
- ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
- return appRecord.getSimpleField("name");
- }
- return NONE;
- }
- }
-
- static class App {
- private String name = NONE;
- private String cluster;
- private String uri = NONE;
- }
-
- static class Cluster {
- private final ZkClient zkClient;
- private final String taskPath;
- private final String processPath;
- private final String appPath;
-
- String clusterName;
- int taskNumber;
- App app;
-
- List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-
- public Cluster(String clusterName, ZkClient zkClient) throws Exception {
- this.clusterName = clusterName;
- this.zkClient = zkClient;
- this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
- this.processPath = "/s4/clusters/" + clusterName + "/process";
- this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
- readClusterFromZk();
- }
-
- public void readClusterFromZk() throws Exception {
- List<String> processes;
- List<String> tasks;
-
- tasks = zkClient.getChildren(taskPath);
- processes = zkClient.getChildren(processPath);
-
- taskNumber = tasks.size();
-
- for (int i = 0; i < processes.size(); i++) {
- ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
- if (process != null) {
- int partition = Integer.parseInt(process.getSimpleField("partition"));
- String host = process.getSimpleField("host");
- int port = Integer.parseInt(process.getSimpleField("port"));
- String taskId = process.getSimpleField("taskId");
- ClusterNode node = new ClusterNode(partition, port, host, taskId);
- nodes.add(node);
- }
- }
-
- app = new App();
- app.cluster = clusterName;
- try {
- ZNRecord appRecord = zkClient.readData(appPath);
- app.name = appRecord.getSimpleField("name");
- app.uri = appRecord.getSimpleField("s4r_uri");
- } catch (ZkNoNodeException e) {
- logger.warn(appPath + " doesn't exist");
- }
- }
-
- }
-
-}