You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/14 00:21:13 UTC

git commit: [S4-110] Minor changes to tools. Tested twitter example

Updated Branches:
  refs/heads/S4-110-new 16e5353a0 -> 17d667d73


[S4-110] Minor changes to tools. Tested twitter example


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/17d667d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/17d667d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/17d667d7

Branch: refs/heads/S4-110-new
Commit: 17d667d73d5ad8bdec2858ee3487bc3cc9b6453e
Parents: 16e5353
Author: kishoreg <ki...@apache.org>
Authored: Wed Feb 13 16:21:05 2013 -0800
Committer: kishoreg <ki...@apache.org>
Committed: Wed Feb 13 16:21:05 2013 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/tools/Tools.java   |    2 +
 .../java/org/apache/s4/tools/helix/CreateTask.java |   15 ++---
 .../java/org/apache/s4/tools/helix/RemoveTask.java |   55 +++++++++++++++
 3 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17d667d7/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 11e2cb5..7bbaeb2 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -34,6 +34,7 @@ import org.apache.s4.tools.helix.DeployApp;
 import org.apache.s4.tools.helix.GenericEventAdapter;
 import org.apache.s4.tools.helix.RebalanceTask;
 import org.apache.s4.tools.helix.ClusterStatus;
+import org.apache.s4.tools.helix.RemoveTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +60,7 @@ public class Tools {
         genericAdapter(null,GenericEventAdapter.class), 
         addNodes(null,AddNodes.class),
         createTask(null,CreateTask.class), 
+        removeTask(null,RemoveTask.class), 
         rebalanceTask(null,RebalanceTask.class);
       //formatter:on
         

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17d667d7/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
index 02a72d9..b41ae82 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
@@ -30,16 +30,16 @@ public class CreateTask extends S4ArgsBase {
 
         Tools.parseArgs(taskArgs, args);
         String msg = String.format(
-                "Setting up new task [{}] of type:[{}] for stream(s) on nodes belonging to node group {}",
-                taskArgs.taskId, taskArgs.taskType, taskArgs.streamName, taskArgs.nodeGroup);
+                "Setting up new pe [{}] for stream(s) on nodes belonging to node group {}",
+                taskArgs.taskId, taskArgs.streamName, taskArgs.nodeGroup);
         logger.info(msg);
         HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
         ConfigScopeBuilder builder = new ConfigScopeBuilder();
         ConfigScope scope = builder.forCluster(taskArgs.clusterName).forResource(taskArgs.taskId).build();
         Map<String, String> properties = new HashMap<String, String>();
+        properties.put("GROUP", taskArgs.nodeGroup);
         properties.put("type", "Task");
         properties.put("streamName", taskArgs.streamName);
-        properties.put("taskType", taskArgs.taskType);
         admin.setConfig(scope, properties);
         // A task is modeled as a resource in Helix
         admin.addResource(taskArgs.clusterName, taskArgs.taskId, taskArgs.numPartitions, "LeaderStandby",
@@ -56,7 +56,7 @@ public class CreateTask extends S4ArgsBase {
                 instancesInGroup.add(instanceName);
             }
         }
-        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1);
+        admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1,instancesInGroup);
         logger.info("Finished setting up task:" + taskArgs.taskId + "on nodes " + instancesInGroup);
     }
 
@@ -69,19 +69,16 @@ public class CreateTask extends S4ArgsBase {
         @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
         String clusterName;
 
-        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+        @Parameter(names = { "-id", "-taskName" }, description = "name of the Task. Must be unique", required = true, arity = 1)
         String taskId;
 
-        @Parameter(names = { "-t", "-type" }, description = "producer/consumer", required = true, arity = 1)
-        String taskType;
-
         @Parameter(names = { "-p", "-partitions" }, description = "Parallelism/Number of Partition for the task", required = true, arity = 1)
         Integer numPartitions;
 
         @Parameter(names = { "-r", "standbys for each partition" }, description = "Number of Standby processors for each active processor", required = false, arity = 1)
         Integer numStandBys = 1;
 
-        @Parameter(names = { "-s", "-streams" }, description = "name of the stream(s) it produces/consumes.", required = true, arity = 1)
+        @Parameter(names = { "-s", "-stream" }, description = "name of the stream the pe listens to", required = true, arity = 1)
         String streamName;
 
         @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = false, arity = 1)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17d667d7/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
new file mode 100644
index 0000000..044e133
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
@@ -0,0 +1,55 @@
+package org.apache.s4.tools.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+import org.apache.s4.tools.helix.CreateTask.CreateTaskArgs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class RemoveTask extends S4ArgsBase {
+    
+    static Logger logger = LoggerFactory.getLogger(CreateTask.class);
+
+    public static void main(String[] args) {
+        RemoveTaskArgs taskArgs = new RemoveTaskArgs();
+
+        Tools.parseArgs(taskArgs, args);
+        String msg = String.format(
+                "Removing task [{}] from cluster [{}]",
+                taskArgs.taskId, taskArgs.clusterName);
+        logger.info(msg);
+        HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+        admin.dropResource(taskArgs.clusterName, taskArgs.taskId);
+        logger.info("Finished Removing task:" + taskArgs.taskId + " from cluster:"+ taskArgs.clusterName);
+    }
+
+    
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class RemoveTaskArgs extends S4ArgsBase {
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+        String taskId;
+
+    }
+
+}