You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/02/22 17:56:14 UTC
[1/3] git commit: Remove NodeGroups, reuse existing s4 cluster ids
Remove NodeGroups, reuse existing s4 cluster ids
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/36acbce8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/36acbce8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/36acbce8
Branch: refs/heads/S4-110-new
Commit: 36acbce8c7b55d2b33791bcb3219f724ca5bbbf5
Parents: 0b93cae
Author: Daniel Gómez Ferro <da...@yahoo-inc.com>
Authored: Fri Feb 22 12:31:56 2013 +0100
Committer: Daniel Gómez Ferro <da...@yahoo-inc.com>
Committed: Fri Feb 22 12:31:56 2013 +0100
----------------------------------------------------------------------
.../org/apache/s4/comm/helix/S4HelixConstants.java | 6 +++
.../apache/s4/comm/topology/ClusterFromHelix.java | 4 +-
.../java/org/apache/s4/core/S4HelixBootstrap.java | 7 ++-
.../java/org/apache/s4/tools/helix/AddNodes.java | 16 ++++----
.../org/apache/s4/tools/helix/ClusterStatus.java | 4 +-
.../org/apache/s4/tools/helix/CreateCluster.java | 31 ++++++++-------
.../java/org/apache/s4/tools/helix/CreateTask.java | 20 ++++-----
.../java/org/apache/s4/tools/helix/DeployApp.java | 16 +++----
.../org/apache/s4/tools/helix/RebalanceTask.java | 14 +++----
9 files changed, 62 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
new file mode 100644
index 0000000..9bf3fc6
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4HelixConstants.java
@@ -0,0 +1,6 @@
+package org.apache.s4.comm.helix;
+
+public class S4HelixConstants {
+ public static final String HELIX_CLUSTER_NAME = "S4";
+ public static final String GLOBAL_TASK_NAME = "GLOBAL";
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index d14bded..8a77606 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
@@ -39,6 +40,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.s4.base.Destination;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.comm.tcp.TCPDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +149,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
if (externalViewList != null) {
for (ExternalView extView : externalViewList) {
String resource = extView.getId();
- ConfigScope resourceScope = builder.forCluster(clusterName)
+ ConfigScope resourceScope = builder.forCluster(S4HelixConstants.HELIX_CLUSTER_NAME)
.forResource(resource).build();
String resourceType = configAccessor.get(resourceScope,
"type");
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 3944971..3d58dbc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -13,6 +13,7 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.util.ArchiveFetchException;
@@ -91,7 +92,7 @@ public class S4HelixBootstrap implements Bootstrap {
// start a HelixController to manage the cluster
// TODO set this as optional (small clusters only)
String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
- HelixControllerMain.startHelixController(zookeeperAddress, clusterName, controllerName,
+ HelixControllerMain.startHelixController(zookeeperAddress, S4HelixConstants.HELIX_CLUSTER_NAME, controllerName,
HelixControllerMain.STANDALONE);
// this.parentInjector = parentInjector;
S4HelixBootstrap.rootInjector = parentInjector;
@@ -103,8 +104,8 @@ public class S4HelixBootstrap implements Bootstrap {
private void registerWithHelix() {
HelixManager helixManager;
try {
- helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
- zookeeperAddress);
+ helixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
+ InstanceType.PARTICIPANT, zookeeperAddress);
helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", taskStateModelFactory);
helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", appStateModelFactory);
helixManager.connect();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
index 3a598f7..2e155ed 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
@@ -21,6 +21,7 @@ package org.apache.s4.tools.helix;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,13 +47,16 @@ public class AddNodes {
for (int i = 0; i < clusterArgs.nbNodes; i++) {
String host = "localhost";
if (split.length > 0 && split.length == clusterArgs.nbNodes) {
- host = split[i].trim();
+ String node = split[i].trim();
+ if (!node.isEmpty()) {
+ host = node;
+ }
}
- InstanceConfig instanceConfig = new InstanceConfig("node_" + host + "_" + initialPort);
+ InstanceConfig instanceConfig = new InstanceConfig(host + "_" + initialPort);
instanceConfig.setHostName(host);
instanceConfig.setPort("" + initialPort);
- instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
- helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+ instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.clusterName);
+ helixAdmin.addInstance(S4HelixConstants.HELIX_CLUSTER_NAME, instanceConfig);
initialPort = initialPort + 1;
}
}
@@ -80,10 +84,6 @@ public class AddNodes {
@Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
int firstListeningPort = -1;
-
- @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
- String nodeGroup = "default";
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
index 73d05c2..8dce98e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
@@ -118,7 +118,7 @@ public class ClusterStatus extends S4ArgsBase {
System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Task Id", 20),
StatusUtils.inMiddle("Cluster", 20), StatusUtils.inMiddle("Description", 90));
System.out.println(StatusUtils.generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(taskId, 20), StatusUtils.inMiddle(cluster, 20),
+ System.out.format("%-30s%-20s%-90s%n", StatusUtils.inMiddle(taskId, 30), StatusUtils.inMiddle(cluster, 20),
StatusUtils.inMiddle(streamName + " " + ((taskType == null) ? "(untyped)" : taskType), 90));
System.out.println(StatusUtils.generateEdge(130));
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
@@ -230,7 +230,7 @@ public class ClusterStatus extends S4ArgsBase {
} else {
System.out.format("%n%-50s", " ");
}
- System.out.format("%-10s%-46s%-10s%-10s", StatusUtils.inMiddle("" + config.getId(), 10),
+ System.out.format("%-30s%-46s%-10s%-10s", StatusUtils.inMiddle("" + config.getId(), 30),
StatusUtils.inMiddle(config.getHostName(), 50), StatusUtils.inMiddle(config.getPort() + "", 10),
StatusUtils.inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
index f71d221..d616196 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
@@ -18,10 +18,12 @@
package org.apache.s4.tools.helix;
+import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +42,19 @@ public class CreateCluster {
try {
logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbNodes);
ZKHelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
- helixAdmin.addCluster(clusterArgs.clusterName, false);
- StateModelDefinition onlineofflinemodel = new StateModelDefinition(
- new StateModelConfigGenerator().generateConfigForOnlineOffline());
- StateModelDefinition leaderstandbymodel = new StateModelDefinition(
- new StateModelConfigGenerator().generateConfigForLeaderStandby());
-
- helixAdmin.addStateModelDef(clusterArgs.clusterName, "OnlineOffline", onlineofflinemodel);
- helixAdmin.addStateModelDef(clusterArgs.clusterName, "LeaderStandby", leaderstandbymodel);
+ try {
+ helixAdmin.addCluster(S4HelixConstants.HELIX_CLUSTER_NAME, false);
+ StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+ new StateModelConfigGenerator().generateConfigForOnlineOffline());
+ StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+ new StateModelConfigGenerator().generateConfigForLeaderStandby());
+
+ helixAdmin.addStateModelDef(S4HelixConstants.HELIX_CLUSTER_NAME, "OnlineOffline", onlineofflinemodel);
+ helixAdmin.addStateModelDef(S4HelixConstants.HELIX_CLUSTER_NAME, "LeaderStandby", leaderstandbymodel);
+ } catch (HelixException he) {
+ // S4 configuration already exists, ignore
+ }
+
if (clusterArgs.nbNodes > 0) {
String[] split = clusterArgs.nodes.split(",");
int initialPort = clusterArgs.firstListeningPort;
@@ -59,8 +66,8 @@ public class CreateCluster {
}
instanceConfig.setHostName(host);
instanceConfig.setPort("" + initialPort);
- instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
- helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+ instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.clusterName);
+ helixAdmin.addInstance(S4HelixConstants.HELIX_CLUSTER_NAME, instanceConfig);
initialPort = initialPort + 1;
}
}
@@ -88,10 +95,6 @@ public class CreateCluster {
@Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
int firstListeningPort = -1;
-
- @Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required = false, arity = 1)
- String nodeGroup = "default";
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
index 428cddc..36a45bc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
@@ -11,6 +11,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
@@ -28,32 +29,32 @@ public class CreateTask extends S4ArgsBase {
Tools.parseArgs(taskArgs, args);
String msg = String.format("Setting up new pe [{}] for stream(s) on nodes belonging to node group {}",
- taskArgs.taskId, taskArgs.streamName, taskArgs.nodeGroup);
+ taskArgs.taskId, taskArgs.streamName, taskArgs.clusterName);
logger.info(msg);
HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(taskArgs.clusterName).forResource(taskArgs.taskId).build();
+ ConfigScope scope = builder.forCluster(S4HelixConstants.HELIX_CLUSTER_NAME).forResource(taskArgs.taskId).build();
Map<String, String> properties = new HashMap<String, String>();
- properties.put("GROUP", taskArgs.nodeGroup);
+ properties.put("GROUP", taskArgs.clusterName);
properties.put("type", "Task");
properties.put("streamName", taskArgs.streamName);
admin.setConfig(scope, properties);
// A task is modeled as a resource in Helix
- admin.addResource(taskArgs.clusterName, taskArgs.taskId, taskArgs.numPartitions, "LeaderStandby",
+ admin.addResource(S4HelixConstants.HELIX_CLUSTER_NAME, taskArgs.taskId, taskArgs.numPartitions, "LeaderStandby",
IdealStateModeProperty.AUTO.toString());
// This does the assignment of partition to nodes. It uses a modified
// version of consistent hashing to distribute active partitions and standbys
// equally among nodes.
List<String> instancesInGroup = new ArrayList<String>();
- List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+ List<String> instancesInCluster = admin.getInstancesInCluster(S4HelixConstants.HELIX_CLUSTER_NAME);
for (String instanceName : instancesInCluster) {
- InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+ InstanceConfig instanceConfig = admin.getInstanceConfig(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName);
String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
- if (nodeGroup.equals(taskArgs.nodeGroup)) {
+ if (nodeGroup.equals(taskArgs.clusterName)) {
instancesInGroup.add(instanceName);
}
}
- admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1, instancesInGroup);
+ admin.rebalance(S4HelixConstants.HELIX_CLUSTER_NAME, taskArgs.taskId, taskArgs.numStandBys + 1, instancesInGroup);
logger.info("Finished setting up task:" + taskArgs.taskId + " on nodes " + instancesInGroup);
}
@@ -78,8 +79,5 @@ public class CreateTask extends S4ArgsBase {
@Parameter(names = { "-s", "-stream" }, description = "name of the stream the pe listens to", required = true, arity = 1)
String streamName;
- @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = false, arity = 1)
- String nodeGroup = "default";
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index 4910bb1..686af79 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -16,6 +16,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.s4.comm.HelixBasedCommModule;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.core.HelixBasedCoreModule;
import org.apache.s4.core.util.AppConfig;
import org.apache.s4.tools.Deploy;
@@ -41,7 +42,7 @@ public class DeployApp extends S4ArgsBase {
HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
+ ConfigScope scope = builder.forCluster(S4HelixConstants.HELIX_CLUSTER_NAME).forResource(deployArgs.appName).build();
Map<String, String> properties = new HashMap<String, String>();
URI s4rURI = null;
@@ -81,7 +82,7 @@ public class DeployApp extends S4ArgsBase {
properties.putAll(appConfig.asMap());
admin.setConfig(scope, properties);
- IdealState is = admin.getResourceIdealState(deployArgs.clusterName, deployArgs.appName);
+ IdealState is = admin.getResourceIdealState(S4HelixConstants.HELIX_CLUSTER_NAME, deployArgs.appName);
if (is == null) {
is = new IdealState(deployArgs.appName);
}
@@ -89,11 +90,11 @@ public class DeployApp extends S4ArgsBase {
is.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
is.setStateModelDefRef("OnlineOffline");
List<String> instancesInGroup = new ArrayList<String>();
- List<String> instancesInCluster = admin.getInstancesInCluster(deployArgs.clusterName);
+ List<String> instancesInCluster = admin.getInstancesInCluster(S4HelixConstants.HELIX_CLUSTER_NAME);
for (String instanceName : instancesInCluster) {
- InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName, instanceName);
+ InstanceConfig instanceConfig = admin.getInstanceConfig(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName);
String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
- if (nodeGroup.equals(deployArgs.nodeGroup)) {
+ if (nodeGroup.equals(deployArgs.clusterName)) {
instancesInGroup.add(instanceName);
}
}
@@ -101,7 +102,7 @@ public class DeployApp extends S4ArgsBase {
is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
}
- admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
+ admin.setResourceIdealState(S4HelixConstants.HELIX_CLUSTER_NAME, deployArgs.appName, is);
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
@@ -125,9 +126,6 @@ public class DeployApp extends S4ArgsBase {
@Parameter(names = { "-a", "-appClass" }, description = "Full class name of the application class (extending App or AdapterApp)", required = false)
String appClass = "";
- @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the App needs to be deployed", required = false, arity = 1)
- String nodeGroup = "default";
-
@Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
List<String> extraNamedParameters = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/36acbce8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
index c0aff7a..dcedf7c 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
@@ -7,6 +7,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
@@ -23,17 +24,17 @@ public class RebalanceTask extends S4ArgsBase {
// This does the assignment of partition to nodes. It uses a modified
// version of consistent hashing to distribute active partitions and standbys
// equally among nodes.
- IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
+ IdealState currentAssignment = admin.getResourceIdealState(S4HelixConstants.HELIX_CLUSTER_NAME, taskArgs.taskId);
List<String> instancesInGroup = new ArrayList<String>();
- List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+ List<String> instancesInCluster = admin.getInstancesInCluster(S4HelixConstants.HELIX_CLUSTER_NAME);
for (String instanceName : instancesInCluster) {
- InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+ InstanceConfig instanceConfig = admin.getInstanceConfig(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName);
String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
- if (nodeGroup.equals(taskArgs.nodeGroup)) {
+ if (nodeGroup.equals(taskArgs.clusterName)) {
instancesInGroup.add(instanceName);
}
}
- admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
+ admin.rebalance(S4HelixConstants.HELIX_CLUSTER_NAME, currentAssignment, instancesInGroup);
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
@@ -48,8 +49,5 @@ public class RebalanceTask extends S4ArgsBase {
@Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
String taskId;
- @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
- String nodeGroup = "default";
-
}
}