You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/04 04:40:11 UTC

[1/2] interim checkin. Cant get the bindings working

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
new file mode 100644
index 0000000..89361a3
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.helix;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class CreateCluster {
+
+    static Logger logger = LoggerFactory.getLogger(CreateCluster.class);
+
+    public static void main(String[] args) {
+
+        ZKServerArgs clusterArgs = new ZKServerArgs();
+        Tools.parseArgs(clusterArgs, args);
+        try {
+            logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbNodes);
+            ZKHelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
+            helixAdmin.addCluster(clusterArgs.clusterName, false);
+            StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForOnlineOffline());
+            StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForLeaderStandby());
+
+            helixAdmin.addStateModelDef(clusterArgs.clusterName, "OnlineOffline", onlineofflinemodel);
+            helixAdmin.addStateModelDef(clusterArgs.clusterName, "LeaderStandby", leaderstandbymodel);
+            if (clusterArgs.nbNodes > 0) {
+                String[] split = clusterArgs.nodes.split(",");
+                int initialPort = clusterArgs.firstListeningPort;
+                for (int i = 0; i < clusterArgs.nbNodes; i++) {
+                    InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+                    String host = "localhost";
+                    if (split.length > 0 && split.length == clusterArgs.nbNodes) {
+                        host = split[i].trim();
+                    }
+                    instanceConfig.setHostName(host);
+                    instanceConfig.setPort("" + initialPort);
+                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
+                    helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+                    initialPort = initialPort + 1;
+                }
+            }
+            logger.info("New cluster configuration uploaded into zookeeper");
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+
+    }
+
+    @Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
+    static class ZKServerArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
+        String clusterName = "s4-test-cluster";
+
+        @Parameter(names = "-nbNodes", description = "number of S4 nodes for the cluster", required = false)
+        int nbNodes = 0;
+
+        @Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
+        String nodes = "";
+
+        @Parameter(names = "-zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+        int firstListeningPort = -1;
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required = false, arity = 1)
+        String nodeGroup = "default";
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
new file mode 100644
index 0000000..02a72d9
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
@@ -0,0 +1,91 @@
+package org.apache.s4.tools.helix;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class CreateTask extends S4ArgsBase {
+
+    static Logger logger = LoggerFactory.getLogger(CreateTask.class);
+
+    public static void main(String[] args) {
+        CreateTaskArgs taskArgs = new CreateTaskArgs();
+
+        Tools.parseArgs(taskArgs, args);
+        String msg = String.format(
+                "Setting up new task [{}] of type:[{}] for stream(s) on nodes belonging to node group {}",
+                taskArgs.taskId, taskArgs.taskType, taskArgs.streamName, taskArgs.nodeGroup);
+        logger.info(msg);
+        HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(taskArgs.clusterName).forResource(taskArgs.taskId).build();
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put("type", "Task");
+        properties.put("streamName", taskArgs.streamName);
+        properties.put("taskType", taskArgs.taskType);
+        admin.setConfig(scope, properties);
+        // A task is modeled as a resource in Helix
+        admin.addResource(taskArgs.clusterName, taskArgs.taskId, taskArgs.numPartitions, "LeaderStandby",
+                IdealStateModeProperty.AUTO.toString());
+        // This does the assignment of partition to nodes. It uses a modified
+        // version of consistent hashing to distribute active partitions and standbys
+        // equally among nodes.
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+                instancesInGroup.add(instanceName);
+            }
+        }
+        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1);
+        logger.info("Finished setting up task:" + taskArgs.taskId + "on nodes " + instancesInGroup);
+    }
+
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class CreateTaskArgs extends S4ArgsBase {
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+        String taskId;
+
+        @Parameter(names = { "-t", "-type" }, description = "producer/consumer", required = true, arity = 1)
+        String taskType;
+
+        @Parameter(names = { "-p", "-partitions" }, description = "Parallelism/Number of Partition for the task", required = true, arity = 1)
+        Integer numPartitions;
+
+        @Parameter(names = { "-r", "standbys for each partition" }, description = "Number of Standby processors for each active processor", required = false, arity = 1)
+        Integer numStandBys = 1;
+
+        @Parameter(names = { "-s", "-streams" }, description = "name of the stream(s) it produces/consumes.", required = true, arity = 1)
+        String streamName;
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = false, arity = 1)
+        String nodeGroup = "default";
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
new file mode 100644
index 0000000..813f13d
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -0,0 +1,85 @@
+package org.apache.s4.tools.helix;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.IdealStateCalculatorByShuffling;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class DeployApp extends S4ArgsBase {
+    public static void main(String[] args) {
+        DeployAppArgs deployArgs = new DeployAppArgs();
+
+        Tools.parseArgs(deployArgs, args);
+
+        HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
+        properties.put("type", "App");
+        admin.setConfig(scope, properties);
+
+        IdealState is = admin.getResourceIdealState(deployArgs.clusterName, deployArgs.appName);
+        if (is == null) {
+            is = new IdealState(deployArgs.appName);
+        }
+        is.setNumPartitions(1);
+        is.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+        is.setStateModelDefRef("OnlineOffline");
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(deployArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName, instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if (nodeGroup.equals(deployArgs.nodeGroup)) {
+                instancesInGroup.add(instanceName);
+            }
+        }
+        for (String instanceName : instancesInGroup) {
+            is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
+        }
+
+        admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
+    }
+
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class DeployAppArgs extends S4ArgsBase {
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = "-s4r", description = "Path to existing s4r file", required = true)
+        String s4rPath;
+
+        @Parameter(names = { "-generatedS4R", "-g" }, description = "Location of generated s4r (incompatible with -s4r option). By default, s4r is generated in a temporary directory on the local file system. In a distributed environment, you probably want to specify a location accessible through a distributed file system like NFS. That's the purpose of this option.", required = false)
+        String generatedS4R;
+
+        @Parameter(names = { "-appName" }, description = "Name of the App", required = true, arity = 1)
+        String appName;
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the App needs to be deployed", required = false, arity = 1)
+        String nodeGroup = "default";
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
new file mode 100644
index 0000000..fddbd03
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -0,0 +1,72 @@
+package org.apache.s4.tools.helix;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.IdealState;
+import org.apache.s4.base.Event;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.ClusterFromHelix;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class GenericEventAdapter {
+
+    public static void main(String[] args) {
+        AdapterArgs adapterArgs = new AdapterArgs();
+
+        Tools.parseArgs(adapterArgs, args);
+        try {
+            String instanceName = "adapter";
+            HelixManager manager = HelixManagerFactory.getZKHelixManager(adapterArgs.clusterName, instanceName,
+                    InstanceType.SPECTATOR, adapterArgs.zkConnectionString);
+            ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
+                    30, 60);
+            manager.connect();
+            manager.addExternalViewChangeListener(cluster);	
+
+            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(adapterArgs.streamName));
+            TCPEmitter emitter = new TCPEmitter(cluster, 1000, 1000);
+            while (true) {
+                int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
+                Event event = new Event();
+                event.put("name", String.class, "Hello world to partition:" + partitionId);
+                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+                KryoSerDeser serializer = new KryoSerDeser(classLoader);
+//                EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
+                System.out.println("Sending event to partition:" + partitionId);
+                emitter.send(partitionId, serializer.serialize(event));
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class AdapterArgs extends S4ArgsBase {
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
+        String streamName;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
new file mode 100644
index 0000000..e4f4526
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
@@ -0,0 +1,61 @@
+package org.apache.s4.tools.helix;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class RebalanceTask extends S4ArgsBase {
+    public static void main(String[] args) {
+        RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
+
+        Tools.parseArgs(taskArgs, args);
+
+        HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+        // This does the assignment of partition to nodes. It uses a modified
+        // version of consistent hashing to distribute active partitions and standbys
+        // equally among nodes.
+        IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+                instancesInGroup.add(instanceName);
+            }
+        }
+        admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
+    }
+
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class RebalanceTaskArgs extends S4ArgsBase {
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+        String taskId;
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
+        String nodeGroup = "default";
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java
new file mode 100644
index 0000000..079fcff
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/S4Status.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.helix;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Maps;
+
+public class S4Status extends S4ArgsBase {
+    static Logger logger = LoggerFactory.getLogger(S4Status.class);
+
+    private static String NONE = "--";
+
+    public static void main(String[] args) {
+
+        StatusArgs statusArgs = new StatusArgs();
+        Tools.parseArgs(statusArgs, args);
+
+        try {
+            if (statusArgs.clusters.size() > 0) {
+                for (String cluster : statusArgs.clusters) {
+                    HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster, "ADMIN",
+                            InstanceType.ADMINISTRATOR, statusArgs.zkConnectionString);
+                    manager.connect();
+                    ConfigAccessor configAccessor = manager.getConfigAccessor();
+                    ConfigScopeBuilder builder = new ConfigScopeBuilder();
+                    printClusterInfo(manager, cluster);
+                    List<String> resourcesInCluster = manager.getClusterManagmentTool().getResourcesInCluster(cluster);
+
+                    List<String> apps = new ArrayList<String>();
+                    List<String> tasks = new ArrayList<String>();
+
+                    for (String resource : resourcesInCluster) {
+                        ConfigScope scope = builder.forCluster(cluster).forResource(resource).build();
+                        String resourceType = configAccessor.get(scope, "type");
+                        if ("App".equals(resourceType)) {
+                            apps.add(resource);
+                        } else if ("Task".equals(resourceType)) {
+                            tasks.add(resource);
+                        }
+                    }
+                    if (statusArgs.apps == null && statusArgs.streams == null) {
+                        statusArgs.apps = apps;
+                        statusArgs.streams = tasks;
+                    }
+                    for (String app : statusArgs.apps) {
+                        if (resourcesInCluster.contains(app)) {
+                            printAppInfo(manager, cluster, app);
+                        }
+                    }
+
+                    if (statusArgs.streams != null && statusArgs.streams.size() > 0) {
+                        for (String stream : statusArgs.streams) {
+                            if (resourcesInCluster.contains(stream)) {
+                                printStreamInfo(manager, cluster, stream);
+                            }
+                        }
+                    }
+                    manager.disconnect();
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Cannot get the status of S4", e);
+        }
+
+    }
+
+    private static void printStreamInfo(HelixManager manager, String cluster, String taskId) {
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
+        String streamName = configAccessor.get(scope, "streamName");
+        String taskType = configAccessor.get(scope, "taskType");
+
+        System.out.println("Task Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
+                inMiddle("Description", 90));
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20), inMiddle(cluster, 20),
+                inMiddle(streamName + " " + taskType, 90));
+        System.out.println(generateEdge(130));
+        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = helixDataAccessor.keyBuilder();
+        IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
+        ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+        System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
+        System.out.println(generateEdge(130));
+        for (String partition : assignment.getPartitionSet()) {
+            Map<String, String> stateMap = view.getStateMap(partition);
+            StringBuilder sb = new StringBuilder();
+            String delim = "";
+            for (String instance : stateMap.keySet()) {
+                sb.append(delim);
+                String state = stateMap.get(instance);
+                if (liveInstances.contains(instance)) {
+                    sb.append(instance).append(":").append(state);
+                } else {
+                    sb.append(instance).append(":").append("OFFLINE");
+                }
+                delim = ", ";
+            }
+            System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
+        }
+        System.out.println(generateEdge(130));
+    }
+
+    private static void printAppInfo(HelixManager manager, String cluster, String app) {
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
+        String uri = configAccessor.get(scope, "s4r_uri");
+
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
+        System.out.println(generateEdge(130));
+        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = helixDataAccessor.keyBuilder();
+        IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
+        ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+        Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
+        Map<String, String> appStateMap = view.getStateMap(app);
+        System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
+        System.out.println(generateEdge(130));
+        for (String instance : assignmentMap.keySet()) {
+            String state = appStateMap.get(instance);
+            System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
+                    inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
+        }
+
+        System.out.println(generateEdge(130));
+
+    }
+
+    private static void printClusterInfo(HelixManager manager, String cluster) {
+        HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = dataAccessor.keyBuilder();
+        List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
+        List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
+        if (liveInstances == null) {
+            liveInstances = Collections.emptyList();
+        }
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
+                inMiddle("Active", 10), generateEdge(80));
+        System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
+                inMiddle("Port", 8), inMiddle("Active", 10));
+        System.out.println(generateEdge(130));
+
+        System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
+                inMiddle("" + liveInstances.size(), 8));
+        boolean first = true;
+
+        for (String instance : instances) {
+            InstanceConfig config = dataAccessor.getProperty(keyBuilder.instanceConfig(instance));
+            // System.out.println(config);
+            if (first) {
+                first = false;
+            } else {
+                System.out.format("%n%-50s", " ");
+            }
+            System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
+                    inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
+                    inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
+        }
+
+        System.out.println();
+    }
+
+    @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
+    static class StatusArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
+        List<String> apps;
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
+        List<String> clusters;
+
+        @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
+        List<String> streams;
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
+        int timeout = 10000;
+    }
+
+    private static void showAppsStatus(List<Cluster> clusters) {
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        for (Cluster cluster : clusters) {
+            if (!NONE.equals(cluster.app.name)) {
+                System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+                        inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+            }
+        }
+        System.out.println(generateEdge(130));
+
+    }
+
+    private static void showClustersStatus(List<Cluster> clusters) {
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
+                generateEdge(80));
+        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
+                inMiddle("Host", 50), inMiddle("Port", 8));
+        System.out.println(generateEdge(130));
+
+        for (Cluster cluster : clusters) {
+            System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+                    inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+                    inMiddle("" + cluster.nodes.size(), 8));
+            boolean first = true;
+            for (ClusterNode node : cluster.nodes) {
+                if (first) {
+                    first = false;
+                } else {
+                    System.out.format("%n%-60s", " ");
+                }
+                System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+                        inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+            }
+            System.out.println();
+        }
+        System.out.println(generateEdge(130));
+    }
+
+    private static void showStreamsStatus(List<Stream> streams) {
+        System.out.println("Stream Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
+                inMiddle("Consumers", 55));
+        System.out.println(generateEdge(130));
+
+        for (Stream stream : streams) {
+            System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+                    inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+                    inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
+        }
+        System.out.println(generateEdge(130));
+
+    }
+
+    private static String inMiddle(String content, int width) {
+        int i = (width - content.length()) / 2;
+        return String.format("%" + i + "s%s", " ", content);
+    }
+
+    private static String generateEdge(int length) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            sb.append("-");
+        }
+        return sb.toString();
+    }
+
+    /**
+     * show as cluster1(app1), cluster2(app2)
+     * 
+     * @param clusters
+     *            cluster list
+     * @param clusterAppMap
+     *            <cluster,app>
+     * @return
+     */
+    private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+        if (clusters == null || clusters.size() == 0) {
+            return NONE;
+        } else {
+            // show as: cluster1(app1), cluster2(app2)
+            StringBuilder sb = new StringBuilder();
+            for (String cluster : clusters) {
+                String app = clusterAppMap.get(cluster);
+                sb.append(cluster);
+                if (!NONE.equals(app)) {
+                    sb.append("(").append(app).append(")");
+                }
+                sb.append(" ");
+            }
+            return sb.toString();
+        }
+    }
+
+    static class Stream {
+
+        private final ZkClient zkClient;
+        private final String consumerPath;
+        private final String producerPath;
+
+        String streamName;
+        Set<String> producers = new HashSet<String>();// cluster name
+        Set<String> consumers = new HashSet<String>();// cluster name
+
+        Map<String, String> clusterAppMap = Maps.newHashMap();
+
+        public Stream(String streamName, ZkClient zkClient) throws Exception {
+            this.streamName = streamName;
+            this.zkClient = zkClient;
+            this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+            this.producerPath = "/s4/streams/" + streamName + "/producers";
+            readStreamFromZk();
+        }
+
+        private void readStreamFromZk() throws Exception {
+            List<String> consumerNodes = zkClient.getChildren(consumerPath);
+            for (String node : consumerNodes) {
+                ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+                consumers.add(consumer.getSimpleField("clusterName"));
+            }
+
+            List<String> producerNodes = zkClient.getChildren(producerPath);
+            for (String node : producerNodes) {
+                ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+                producers.add(consumer.getSimpleField("clusterName"));
+            }
+
+            getAppNames();
+        }
+
+        private void getAppNames() {
+            Set<String> clusters = new HashSet<String>(consumers);
+            clusters.addAll(producers);
+            for (String cluster : clusters) {
+                clusterAppMap.put(cluster, getApp(cluster, zkClient));
+            }
+        }
+
+        public boolean containsCluster(String cluster) {
+            if (producers.contains(cluster) || consumers.contains(cluster)) {
+                return true;
+            }
+            return false;
+        }
+
+        private static String getApp(String clusterName, ZkClient zkClient) {
+            String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            if (zkClient.exists(appPath)) {
+                ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+                return appRecord.getSimpleField("name");
+            }
+            return NONE;
+        }
+    }
+
+    static class App {
+        private String name = NONE;
+        private String cluster;
+        private String uri = NONE;
+    }
+
+    static class Cluster {
+        private final ZkClient zkClient;
+        private final String taskPath;
+        private final String processPath;
+        private final String appPath;
+
+        String clusterName;
+        int taskNumber;
+        App app;
+
+        List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+        public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+            this.clusterName = clusterName;
+            this.zkClient = zkClient;
+            this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+            this.processPath = "/s4/clusters/" + clusterName + "/process";
+            this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            readClusterFromZk();
+        }
+
+        public void readClusterFromZk() throws Exception {
+            List<String> processes;
+            List<String> tasks;
+
+            tasks = zkClient.getChildren(taskPath);
+            processes = zkClient.getChildren(processPath);
+
+            taskNumber = tasks.size();
+
+            for (int i = 0; i < processes.size(); i++) {
+                ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+                if (process != null) {
+                    int partition = Integer.parseInt(process.getSimpleField("partition"));
+                    String host = process.getSimpleField("host");
+                    int port = Integer.parseInt(process.getSimpleField("port"));
+                    String taskId = process.getSimpleField("taskId");
+                    ClusterNode node = new ClusterNode(partition, port, host, taskId);
+                    nodes.add(node);
+                }
+            }
+
+            app = new App();
+            app.cluster = clusterName;
+            try {
+                ZNRecord appRecord = zkClient.readData(appPath);
+                app.name = appRecord.getSimpleField("name");
+                app.uri = appRecord.getSimpleField("s4r_uri");
+            } catch (ZkNoNodeException e) {
+                logger.warn(appPath + " doesn't exist");
+            }
+        }
+
+    }
+
+}