You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/01/03 06:47:32 UTC

git commit: [S4-110] Adding commands to add new nodes and rebalance the task

Updated Branches:
  refs/heads/S4-110 6a851a314 -> ac0dc74b7


[S4-110] Adding commands to add new nodes and rebalance the task


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/ac0dc74b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/ac0dc74b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/ac0dc74b

Branch: refs/heads/S4-110
Commit: ac0dc74b75b659aa748f98f9f0d686912f831935
Parents: 6a851a3
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Jan 2 22:47:20 2013 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Jan 2 22:47:20 2013 -0800

----------------------------------------------------------------------
 README.md                                          |   18 ++-
 build.gradle                                       |    2 +-
 .../org/apache/s4/comm/helix/S4StateModel.java     |    6 +
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |   10 +-
 .../main/java/org/apache/s4/tools/AddNodes.java    |   88 ++++++++++
 .../java/org/apache/s4/tools/CreateCluster.java    |   97 +++++++++++
 .../main/java/org/apache/s4/tools/CreateTask.java  |  129 ++++++++-------
 .../main/java/org/apache/s4/tools/DeployApp.java   |   94 ++++++-----
 .../java/org/apache/s4/tools/RebalanceTask.java    |   62 +++++++
 .../src/main/java/org/apache/s4/tools/Tools.java   |   17 ++-
 10 files changed, 404 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index cf028ca..3045a3c 100644
--- a/README.md
+++ b/README.md
@@ -45,9 +45,9 @@ Create the cluster, -nbNodes is just the number of s4 nodes that will be run. Th
 
     ./s4 newCluster -c=cluster1 -nbNodes=2 -flp=12000
 
-Create a task that processes events from stream(names). -id can be anything but should be unique within a cluster, for now id and stream name needs to be the same. p is the number of partitions, so in this case it distributes 4 partitions among two nodes. -r is the number of replica/standby needed for each partition. Note that, when a node fails its load would be distributed among remaining nodes. So even though theoretically its possible to have number of standby's as the number of nodes, the performance would be horrible. In general this can be decided based on the head room available in the cluster.
+Create a task that processes events from stream(names). -id can be anything but should be unique within a cluster, for now id and stream name needs to be the same. p is the number of partitions, so in this case it distributes 6 partitions among two nodes. -r is the number of replica/standby needed for each partition. Note that, when a node fails its load would be distributed among remaining nodes. So even though theoretically its possible to have number of standby's as the number of nodes, the performance would be horrible. In general this can be decided based on the head room available in the cluster.
 
-    ./s4 createTask  -zk localhost:2181 -c cluster1 -id names -t consumer -p 4 -r 1 -s names
+    ./s4 createTask -c=cluster1 -id=names -t=consumer -p=6 -r=1 -s=names
 
 Generate a HelloWorld App
 
@@ -59,7 +59,7 @@ Deploy the App by providing the s4r. One can optionally provide the list of node
 
     ./s4 deployApp -c=cluster1 -s4r=/tmp/myApp/build/libs/myApp.s4r -appName=myApp -zk=localhost:2181
 
-Start the two s4 nodes in two separate windows. Note we now need to specify the node id while starting. This allows nodes to associated with same partitions when they are started. 
+Start the two s4 nodes in two separate windows. Note we now need to specify the node id while starting. This allows nodes to associate with same partitions when they are re-started. 
 
     ./s4 node -c=cluster1 -zk=localhost:2181 -id=localhost_12000
     ./s4 node -c=cluster1 -zk=localhost:2181 -id=localhost_12001
@@ -67,11 +67,21 @@ Start the two s4 nodes in two separate windows. Note we now need to specify the
    
 Send some events to names stream. Notice that the partitions are divided among two nodes and each event is routed to appropriate node.
 
-    ./s4 adapter -c=cluster1 -zk=localhost:2181 -s=names
+    ./s4 genericAdapter -c=cluster1 -s=names
 
 Run the status tool to view the cluster state. It provide which nodes are up, what Apps are deployed, metadata about tasks like what stream is it processing how many partitions, which node is leader for each partition etc
 
     ./s4 status -c=cluster1
+    
+Add new nodes, deploy the app to new nodes and re-distribute the task amongst all nodes
+
+    ./s4 addNodes -c=cluster1 -nbNodes=1 -flp=12002 
+    ./s4 deployApp -c=cluster1 -s4r=/tmp/myApp/build/libs/myApp.s4r -appName=myApp -zk=localhost:2181
+    ./s4 rebalanceTask -c=cluster1 -id=names 
+    
+The partitions get re-distributed among 3 nodes. Run the status tool, it should show the new nodes and partition status.
+
+    ./s4 status -c=cluster1   
 
 Overview
 --------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ffbf2e3..1f962d8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,7 +84,7 @@ project.ext["libraries"] = [
     jcip:               'net.jcip:jcip-annotations:1.0',
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
-    helix:              'org.apache.helix:helix-core:0.6-incubating-SNAPSHOT',
+    helix:              'org.apache.helix:helix-core:0.6.1-incubating-SNAPSHOT',
     jackson_core_asl:   'org.codehaus.jackson:jackson-core-asl:1.8.5',  
     jackson_mapper_asl: 'org.codehaus.jackson:jackson-mapper-asl:1.8.5',  
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
index 642ce6c..9eebd06 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
@@ -51,5 +51,11 @@ public class S4StateModel extends StateModel
     logger.info("Transitioning from " + msg.getFromState() + " to "
         + msg.getToState() + "for " + msg.getPartitionName());
   }
+  
+  @Transition(from = "OFFLINE", to = "DROPPED")
+  public void dropPartition(Message msg, NotificationContext context)
+  {
+    logger.info("Dropping partition" + msg.getPartitionName());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 5b0c606..952306a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -46,15 +46,7 @@ public class TaskSetup
   {
     if (isHelixEnabled)
     {
-      helixZkClient = new org.apache.helix.manager.zk.ZkClient(zookeeperAddress);
-      helixZkClient
-          .setZkSerializer(new org.apache.helix.manager.zk.ZNRecordSerializer());
-      if (!helixZkClient.waitUntilConnected(10, TimeUnit.SECONDS))
-      {
-        throw new RuntimeException(
-            "Could not connect to ZooKeeper after 10 seconds.");
-      }
-      helixAdmin = new ZKHelixAdmin(helixZkClient);
+      helixAdmin = new ZKHelixAdmin(zookeeperAddress);
     } else
     {
       zkclient = new ZkClient(zookeeperAddress);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
new file mode 100644
index 0000000..5bab316
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class AddNodes {
+
+    static Logger logger = LoggerFactory.getLogger(AddNodes.class);
+
+    public static void main(String[] args) {
+
+        ZKServerArgs clusterArgs = new ZKServerArgs();
+        Tools.parseArgs(clusterArgs, args);
+        try {
+
+            logger.info("Adding new nodes [{}] to cluster [{}] node(s)",  clusterArgs.nbNodes, clusterArgs.clusterName);
+            HelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
+            int initialPort = clusterArgs.firstListeningPort;
+            if (clusterArgs.nbNodes > 0) {
+                String[] split = clusterArgs.nodes.split(",");
+                for (int i = 0; i < clusterArgs.nbNodes; i++) {
+                    InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+                    String host = "localhost";
+                    if (split.length > 0 && split.length == clusterArgs.nbNodes) {
+                        host = split[i].trim();
+                    }
+                    instanceConfig.setHostName(host);
+                    instanceConfig.setPort("" + initialPort);
+                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
+                    helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+                    initialPort = initialPort + 1;
+                }
+            }
+            logger.info("New nodes configuration uploaded into zookeeper");
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+
+    }
+
+    @Parameters(commandNames = "s4 addNodes", separators = "=", commandDescription = "Setup new S4 logical cluster")
+    static class ZKServerArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
+        String clusterName = "s4-test-cluster";
+
+        @Parameter(names = "-nbNodes", description = "number of S4 nodes for the cluster", required = true)
+        int nbNodes = 1;
+
+        @Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
+        String nodes = "";
+        
+        @Parameter(names = "-zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+        int firstListeningPort = -1;
+        
+        @Parameter(names = {"-ng","-nodeGroup"}, description = "Assign the nodes to one or more groups. This will be useful when you create task", required=false)
+        String nodeGroup = "default";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
new file mode 100644
index 0000000..38fbbe8
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class CreateCluster {
+
+    static Logger logger = LoggerFactory.getLogger(CreateCluster.class);
+
+    public static void main(String[] args) {
+
+        ZKServerArgs clusterArgs = new ZKServerArgs();
+        Tools.parseArgs(clusterArgs, args);
+        try {
+            logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbNodes);
+            ZKHelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
+            helixAdmin.addCluster(clusterArgs.clusterName, false);
+            StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForOnlineOffline());
+            StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForLeaderStandby());
+
+            helixAdmin.addStateModelDef(clusterArgs.clusterName, "OnlineOffline", onlineofflinemodel);
+            helixAdmin.addStateModelDef(clusterArgs.clusterName, "LeaderStandby", leaderstandbymodel);
+            if (clusterArgs.nbNodes > 0) {
+                String[] split = clusterArgs.nodes.split(",");
+                int initialPort = clusterArgs.firstListeningPort;
+                for (int i = 0; i < clusterArgs.nbNodes; i++) {
+                    InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+                    String host = "localhost";
+                    if (split.length > 0 && split.length == clusterArgs.nbNodes) {
+                        host = split[i].trim();
+                    }
+                    instanceConfig.setHostName(host);
+                    instanceConfig.setPort("" + initialPort);
+                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
+                    helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+                    initialPort = initialPort + 1;
+                }
+            }
+            logger.info("New cluster configuration uploaded into zookeeper");
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+
+    }
+
+    @Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
+    static class ZKServerArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
+        String clusterName = "s4-test-cluster";
+
+        @Parameter(names = "-nbNodes", description = "number of S4 nodes for the cluster", required = false)
+        int nbNodes = 0;
+
+        @Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
+        String nodes = "";
+
+        @Parameter(names = "-zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+        int firstListeningPort = -1;
+        
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required = false, arity = 1)
+        String nodeGroup = "default";
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateTask.java
index 6b6593f..2a821e2 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateTask.java
@@ -1,76 +1,89 @@
 package org.apache.s4.tools;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.tools.IdealStateCalculatorByShuffling;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class CreateTask extends S4ArgsBase
-{
-  public static void main(String[] args)
-  {
-    CreateTaskArgs taskArgs = new CreateTaskArgs();
-
-    Tools.parseArgs(taskArgs, args);
-
-    HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(taskArgs.clusterName).forResource(taskArgs.taskId).build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("type", "Task");
-    properties.put("streamName", taskArgs.streamName);
-    properties.put("taskType", taskArgs.taskType);
-    admin.setConfig(scope, properties);
-    // A task is modeled as a resource in Helix
-    admin.addResource(taskArgs.clusterName, taskArgs.taskId,
-        taskArgs.numPartitions, "LeaderStandby",
-        IdealStateModeProperty.AUTO.toString());
-    // This does the assignment of partition to nodes. It uses a modified
-    // version of consistent hashing to distribute active partitions and standbys
-    // equally among nodes.
-    admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1);
-    
-  }
-
-  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-  static class CreateTaskArgs extends S4ArgsBase
-  {
-
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
-
-    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
-    String clusterName;
-
-    @Parameter(names={"-id","-taskId"},description = "id of the task that produces/consumes a stream", required = true, arity = 1)
-    String taskId;
-
-    @Parameter(names={"-t","-type"}, description = "producer/consumer", required = true, arity = 1)
-    String taskType;
-
-    @Parameter(names={"-p", "-partitions"},description = "Parallelism/Number of Partition for the task", required = true, arity = 1)
-    Integer numPartitions;
-
-    @Parameter(names={"-r", "standbys for each partition"},description = "Number of Standby processors for each active processor", required = false, arity = 1)
-    Integer numStandBys = 1;
-
-    @Parameter(names={"-s", "-streams"}, description = "name of the stream(s) it produces/consumes.", required = true, arity = 1)
-    String streamName;
-
-    @Parameter(names={"-n", "-nodes"},description = "Node ids where the stream processor must run on. Optional. By default, the processing is divided equally among all nodes.", required = false, arity = -1)
-    List<String> nodeIds = Collections.emptyList();
-
-  }
+public class CreateTask extends S4ArgsBase {
+
+    static Logger logger = LoggerFactory.getLogger(CreateTask.class);
+
+    public static void main(String[] args) {
+        CreateTaskArgs taskArgs = new CreateTaskArgs();
+
+        Tools.parseArgs(taskArgs, args);
+        String msg = String.format(
+                "Setting up new task [{}] of type:[{}] for stream(s) on nodes belonging to node group {}",
+                taskArgs.taskId, taskArgs.taskType, taskArgs.streamName, taskArgs.nodeGroup);
+        logger.info(msg);
+        HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(taskArgs.clusterName).forResource(taskArgs.taskId).build();
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put("type", "Task");
+        properties.put("streamName", taskArgs.streamName);
+        properties.put("taskType", taskArgs.taskType);
+        admin.setConfig(scope, properties);
+        // A task is modeled as a resource in Helix
+        admin.addResource(taskArgs.clusterName, taskArgs.taskId, taskArgs.numPartitions, "LeaderStandby",
+                IdealStateModeProperty.AUTO.toString());
+        // This does the assignment of partition to nodes. It uses a modified
+        // version of consistent hashing to distribute active partitions and standbys
+        // equally among nodes.
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+                instancesInGroup.add(instanceName);
+            }
+        }
+        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1);
+        logger.info("Finished setting up task:" + taskArgs.taskId + "on nodes " + instancesInGroup);
+    }
+
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class CreateTaskArgs extends S4ArgsBase {
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+        String taskId;
+
+        @Parameter(names = { "-t", "-type" }, description = "producer/consumer", required = true, arity = 1)
+        String taskType;
+
+        @Parameter(names = { "-p", "-partitions" }, description = "Parallelism/Number of Partition for the task", required = true, arity = 1)
+        Integer numPartitions;
+
+        @Parameter(names = { "-r", "standbys for each partition" }, description = "Number of Standby processors for each active processor", required = false, arity = 1)
+        Integer numStandBys = 1;
+
+        @Parameter(names = { "-s", "-streams" }, description = "name of the stream(s) it produces/consumes.", required = true, arity = 1)
+        String streamName;
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = false, arity = 1)
+        String nodeGroup = "default";
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
index 337fc0f..ff1cb88 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
@@ -1,6 +1,7 @@
 package org.apache.s4.tools;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -13,65 +14,70 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.tools.IdealStateCalculatorByShuffling;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class DeployApp extends S4ArgsBase
-{
-  public static void main(String[] args)
-  {
-    DeployAppArgs deployArgs = new DeployAppArgs();
+public class DeployApp extends S4ArgsBase {
+    public static void main(String[] args) {
+        DeployAppArgs deployArgs = new DeployAppArgs();
 
-    Tools.parseArgs(deployArgs, args);
+        Tools.parseArgs(deployArgs, args);
 
-    HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
-    properties.put("type", "App");
-    admin.setConfig(scope, properties);
-    
-    IdealState is = new IdealState(deployArgs.appName);
-    is.setNumPartitions(1);
-    is.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
-    is.setStateModelDefRef("OnlineOffline");
-    if (deployArgs.nodeIds.isEmpty())
-    {
-      List<String> instancesInCluster = admin
-          .getInstancesInCluster(deployArgs.clusterName);
-      for (String instanceName : instancesInCluster)
-      {
-        is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
-      }
+        HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
+        properties.put("type", "App");
+        admin.setConfig(scope, properties);
+
+        IdealState is = admin.getResourceIdealState(deployArgs.clusterName, deployArgs.appName);
+        if (is == null) {
+            is = new IdealState(deployArgs.appName);
+        }
+        is.setNumPartitions(1);
+        is.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+        is.setStateModelDefRef("OnlineOffline");
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(deployArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName, instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if(nodeGroup.equals(deployArgs.nodeGroup)){
+                instancesInGroup.add(instanceName);
+            }
+        }
+        for(String instanceName:instancesInGroup){
+            is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
+        }
+        
+        admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
     }
-    admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
-  }
 
-  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-  static class DeployAppArgs extends S4ArgsBase
-  {
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class DeployAppArgs extends S4ArgsBase {
 
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
 
-    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
-    String clusterName;
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
 
-    @Parameter(names = "-s4r", description = "Path to existing s4r file", required = true)
-    String s4rPath;
+        @Parameter(names = "-s4r", description = "Path to existing s4r file", required = true)
+        String s4rPath;
 
-    @Parameter(names = { "-generatedS4R", "-g" }, description = "Location of generated s4r (incompatible with -s4r option). By default, s4r is generated in a temporary directory on the local file system. In a distributed environment, you probably want to specify a location accessible through a distributed file system like NFS. That's the purpose of this option.", required = false)
-    String generatedS4R;
+        @Parameter(names = { "-generatedS4R", "-g" }, description = "Location of generated s4r (incompatible with -s4r option). By default, s4r is generated in a temporary directory on the local file system. In a distributed environment, you probably want to specify a location accessible through a distributed file system like NFS. That's the purpose of this option.", required = false)
+        String generatedS4R;
 
-    @Parameter(names = { "-appName" }, description = "Name of the App", required = true, arity = 1)
-    String appName;
+        @Parameter(names = { "-appName" }, description = "Name of the App", required = true, arity = 1)
+        String appName;
 
-    @Parameter(names = { "-n", "-nodes" }, description = "Node ids where the stream processor must run on. Optional. By default, the processing is divided equally among all nodes.", required = false, arity = -1)
-    List<String> nodeIds = Collections.emptyList();
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the App needs to be deployed", required = false, arity = 1)
+        String nodeGroup = "default";
 
-  }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
new file mode 100644
index 0000000..1b8214f
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
@@ -0,0 +1,62 @@
+package org.apache.s4.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class RebalanceTask extends S4ArgsBase
+{
+  public static void main(String[] args)
+  {
+    RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
+
+    Tools.parseArgs(taskArgs, args);
+
+    HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+    // This does the assignment of partition to nodes. It uses a modified
+    // version of consistent hashing to distribute active partitions and standbys
+    // equally among nodes.
+    IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
+    List<String> instancesInGroup = new ArrayList<String>();
+    List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+    for (String instanceName : instancesInCluster) {
+        InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+        String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+        if (nodeGroup.equals(taskArgs.nodeGroup)) {
+            instancesInGroup.add(instanceName);
+        }
+    }
+    admin.rebalance(taskArgs.clusterName,currentAssignment, instancesInGroup);
+  }
+
+  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+  static class RebalanceTaskArgs extends S4ArgsBase
+  {
+
+    @Parameter(names = "-zk", description = "ZooKeeper connection string")
+    String zkConnectionString = "localhost:2181";
+
+    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+    String clusterName;
+
+    @Parameter(names={"-id","-taskId"},description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+    String taskId;
+
+    @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
+    String nodeGroup = "default";
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ac0dc74b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index ef36f8f..192869a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -38,9 +38,20 @@ public class Tools {
 
     static Logger logger = LoggerFactory.getLogger(Tools.class);
 
-    enum Task { deployApp(DeployApp.class),
-        deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), genericAdapter(GenericEventAdapter.class),newApp(
-                CreateApp.class), s4r(Package.class), status(S4Status.class),createTask(CreateTask.class);
+    enum Task { 
+        deployApp(DeployApp.class),
+        deploy(Deploy.class), 
+        node(Main.class), 
+        addNodes(AddNodes.class), 
+        zkServer(ZKServer.class), 
+        newCluster(CreateCluster.class), 
+        adapter(null), 
+        genericAdapter(GenericEventAdapter.class),
+        newApp(CreateApp.class), 
+        s4r(Package.class), 
+        status(S4Status.class),
+        createTask(CreateTask.class), 
+        rebalanceTask(RebalanceTask.class);
 
         Class<?> target;