You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/21 20:07:19 UTC
[2/2] git commit: Flag S4 tools specific to Helix with "-helix"
option + fix status output
Updated Branches:
refs/heads/S4-110-new 8af60c83e -> 0b93cae4b
Flag S4 tools specific to Helix with "-helix" option + fix status output
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0b93cae4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0b93cae4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0b93cae4
Branch: refs/heads/S4-110-new
Commit: 0b93cae4b9dbc8b65f2c98c177c3c9af41194268
Parents: d341879
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Feb 21 18:59:55 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Feb 21 21:05:56 2013 +0100
----------------------------------------------------------------------
.../main/java/org/apache/s4/core/BaseModule.java | 8 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 16 +-
.../src/main/java/org/apache/s4/core/S4Node.java | 8 +-
.../main/java/org/apache/s4/core/SenderImpl.java | 55 +--
.../core/moduleloader/ModuleLoaderTestUtils.java | 2 +-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 3 +-
.../org/apache/s4/wordcount/WordCountTest.java | 2 +-
.../src/main/java/org/apache/s4/tools/Status.java | 105 ++---
.../main/java/org/apache/s4/tools/StatusUtils.java | 65 +++
.../src/main/java/org/apache/s4/tools/Tools.java | 35 +-
.../java/org/apache/s4/tools/helix/AddNodes.java | 7 +-
.../org/apache/s4/tools/helix/ClusterStatus.java | 346 +++------------
.../org/apache/s4/tools/helix/CreateCluster.java | 4 +-
.../java/org/apache/s4/tools/helix/CreateTask.java | 11 +-
.../java/org/apache/s4/tools/helix/DeployApp.java | 17 +-
.../apache/s4/tools/helix/GenericEventAdapter.java | 10 +-
.../org/apache/s4/tools/helix/HelixS4ArgsBase.java | 16 +
.../org/apache/s4/tools/helix/RebalanceTask.java | 8 +-
.../java/org/apache/s4/tools/helix/RemoveTask.java | 21 +-
19 files changed, 274 insertions(+), 465 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 5715060..b92c87b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -36,13 +36,14 @@ public class BaseModule extends AbstractModule {
InputStream baseConfigInputStream;
String clusterName;
private final String instanceName;
- boolean useHelix = true;
+ boolean useHelix = false;
- public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName) {
+ public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName, boolean useHelix) {
super();
this.baseConfigInputStream = baseConfigInputStream;
this.clusterName = clusterName;
this.instanceName = instanceName;
+ this.useHelix = useHelix;
}
@Override
@@ -53,8 +54,7 @@ public class BaseModule extends AbstractModule {
// share the Zookeeper connection
bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
- String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
- if (config.getBoolean("s4.helix") || "HELIX".equalsIgnoreCase(clusterManager)) {
+ if (useHelix) {
bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
bind(Cluster.class).to(ClusterFromHelix.class).in(Scopes.SINGLETON);
bind(TaskStateModelFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 43d01c8..248a798 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -39,7 +39,7 @@ public class RemoteSender {
final private Hasher hasher;
AtomicInteger targetPartition = new AtomicInteger();
final private String remoteClusterName;
- private Cluster cluster;
+ private final Cluster cluster;
public RemoteSender(Cluster cluster, Emitter emitter, Hasher hasher, String clusterName) {
super();
@@ -50,8 +50,8 @@ public class RemoteSender {
}
- public void send(String streamName,String hashKey, ByteBuffer message) throws InterruptedException {
-
+ public void send(String streamName, String hashKey, ByteBuffer message) throws InterruptedException {
+
int partition;
if (hashKey == null) {
// round robin by default
@@ -59,11 +59,13 @@ public class RemoteSender {
} else {
partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(streamName));
}
- //TODO: where do we get the mode
-
+ // TODO: where do we get the mode
+
Destination destination = cluster.getDestination(streamName, partition, emitter.getType());
- logger.info("Sending event to partition:"+ partition + " stream: "+streamName);
-
+
+ // TODO log the name of the partition
+ logger.trace("Sending event to partition [{}] through stream [{}]", String.valueOf(partition), streamName);
+
emitter.send(destination, message);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index ab2be80..1293daa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -48,8 +48,9 @@ public class S4Node {
}
});
- Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
- "default.s4.base.properties").openStream(), nodeArgs.clusterName, nodeArgs.instanceName) });
+ Injector injector = Guice
+ .createInjector(new Module[] { new BaseModule(Resources.getResource("default.s4.base.properties")
+ .openStream(), nodeArgs.clusterName, nodeArgs.instanceName, nodeArgs.useHelix) });
Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
try {
bootstrap.start(injector);
@@ -76,5 +77,8 @@ public class S4Node {
@Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
String instanceName = null;
+
+ @Parameter(names = "-helix", description = "Required flag when using a Helix based cluster manager", required = false, arity = 0)
+ boolean useHelix = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 6923f98..d81146d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -39,14 +39,12 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
/**
- * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top
- * level classes of the communication layer.
+ * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
* <p>
- * {@link SenderImpl} is responsible for sending an event to a
- * {@link ProcessingElement} instance using a hashKey.
+ * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
* <p>
- * Details on how the cluster is partitioned and how events are serialized and
- * transmitted to its destination are hidden from the application developer.
+ * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
+ * from the application developer.
*/
public class SenderImpl implements Sender {
@@ -76,10 +74,8 @@ public class SenderImpl implements Sender {
* a hashing function to map keys to partition IDs.
*/
@Inject
- public SenderImpl(Emitter emitter, SerializerDeserializer serDeser,
- Hasher hasher, Assignment assignment,
- SenderExecutorServiceFactory senderExecutorServiceFactory,
- Cluster cluster) {
+ public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+ SenderExecutorServiceFactory senderExecutorServiceFactory, Cluster cluster) {
this.emitter = emitter;
this.serDeser = serDeser;
this.hasher = hasher;
@@ -99,15 +95,12 @@ public class SenderImpl implements Sender {
/*
* (non-Javadoc)
*
- * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String,
- * org.apache.s4.base.Event)
+ * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
*/
@Override
public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- int partition = (int) (hasher.hash(hashKey) % emitter
- .getPartitionCount(event.getStreamName()));
- Destination destination = cluster.getDestination(event.getStreamName(),
- partition, emitter.getType());
+ int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(event.getStreamName()));
+ Destination destination = cluster.getDestination(event.getStreamName(), partition, emitter.getType());
if (isDestinationLocal(destination)) {
metrics.sentLocal();
/* Hey we are in the same JVM, don't use the network. */
@@ -119,11 +112,9 @@ public class SenderImpl implements Sender {
}
private boolean isDestinationLocal(Destination destination) {
- if (emitter.getType().equals("tcp")) {
+ if (emitter.getType().equals("tcp")) {
TCPDestination tcpDestination = ((TCPDestination) destination);
- if (localNode != null
- && localNode.getMachineName().equalsIgnoreCase(
- tcpDestination.getMachineName())
+ if (localNode != null && tcpDestination.getMachineName().equalsIgnoreCase(localNode.getMachineName())
&& localNode.getPort() == tcpDestination.getPort()) {
return true;
}
@@ -139,9 +130,7 @@ public class SenderImpl implements Sender {
/*
* (non-Javadoc)
*
- * @see
- * org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event
- * )
+ * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event )
*/
@Override
public void sendToAllRemotePartitions(Event event) {
@@ -153,8 +142,7 @@ public class SenderImpl implements Sender {
Event event;
int remotePartitionId;
- public SerializeAndSendToRemotePartitionTask(Event event,
- int remotePartitionId) {
+ public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
this.event = event;
this.remotePartitionId = remotePartitionId;
}
@@ -164,14 +152,11 @@ public class SenderImpl implements Sender {
ByteBuffer serializedEvent = serDeser.serialize(event);
try {
// TODO: where can we get the type ?
- Destination destination = cluster.getDestination(
- event.getStreamName(), remotePartitionId,
+ Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId,
emitter.getType());
emitter.send(destination, serializedEvent);
} catch (InterruptedException e) {
- logger.error(
- "Interrupted blocking send operation for event {}. Event is lost.",
- event);
+ logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
Thread.currentThread().interrupt();
}
@@ -191,23 +176,19 @@ public class SenderImpl implements Sender {
@Override
public void run() {
ByteBuffer serializedEvent = serDeser.serialize(event);
- Integer partitionCount = cluster.getPartitionCount(event
- .getStreamName());
+ Integer partitionCount = cluster.getPartitionCount(event.getStreamName());
for (int i = 0; i < partitionCount; i++) {
/* Don't use the comm layer when we send to the same partition. */
try {
// TODO: where to get the mode from
- Destination destination = cluster.getDestination(
- event.getStreamName(), i, "tcp");
+ Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
if (!isDestinationLocal(destination)) {
emitter.send(destination, serializedEvent);
metrics.sentEvent(i);
}
} catch (InterruptedException e) {
- logger.error(
- "Interrupted blocking send operation for event {}. Event is lost.",
- event);
+ logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
// no reason to continue: we were interrupted, so we reset
// the interrupt status and leave
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index aedac39..c0e5bb7 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -69,7 +69,7 @@ public class ModuleLoaderTestUtils {
}
Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
- .openStream(), "cluster1", null),
+ .openStream(), "cluster1", null, false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
Emitter emitter = injector.getInstance(TCPEmitter.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 29bdc0a..19f1a26 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -115,7 +115,8 @@ public class CoreTestUtils extends CommTestUtils {
public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
return Guice.createInjector(Modules.override(
- new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
+ new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null,
+ false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
new NonFailFastZookeeperClientsModule()));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 8d2b58c..69db25b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -60,7 +60,7 @@ public class WordCountTest extends ZkBasedTest {
public void createEmitter() throws IOException {
injector = Guice.createInjector(new BaseModule(
- Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
+ Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null, false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 09ae37d..244b187 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -19,7 +19,6 @@
package org.apache.s4.tools;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -41,8 +40,6 @@ import com.google.common.collect.Maps;
public class Status extends S4ArgsBase {
static Logger logger = LoggerFactory.getLogger(Status.class);
- private static String NONE = "--";
-
public static void main(String[] args) {
StatusArgs statusArgs = new StatusArgs();
@@ -146,33 +143,33 @@ public class Status extends S4ArgsBase {
private static void showAppsStatus(List<Cluster> clusters) {
System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("Cluster", 20), StatusUtils.inMiddle("URI", 90));
+ System.out.println(StatusUtils.generateEdge(130));
for (Cluster cluster : clusters) {
- if (!NONE.equals(cluster.app.name)) {
- System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
- inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+ if (!StatusUtils.NONE.equals(cluster.app.name)) {
+ System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(cluster.app.name, 20),
+ StatusUtils.inMiddle(cluster.app.cluster, 20), cluster.app.uri);
}
}
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
}
private static void showClustersStatus(List<Cluster> clusters) {
System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
- generateEdge(80));
- System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
- inMiddle("Host", 50), inMiddle("Port", 8));
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", StatusUtils.inMiddle("Active nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("App", 20), StatusUtils.inMiddle("Tasks", 10),
+ StatusUtils.generateEdge(80));
+ System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", StatusUtils.inMiddle("Number", 8), StatusUtils.inMiddle("Task id", 10),
+ StatusUtils.inMiddle("Host", 50), StatusUtils.inMiddle("Port", 8));
+ System.out.println(StatusUtils.generateEdge(130));
for (Cluster cluster : clusters) {
- System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
- inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
- inMiddle("" + cluster.nodes.size(), 8));
+ System.out.format("%-20s%-20s%-10s%-10s", StatusUtils.inMiddle(cluster.clusterName, 20),
+ StatusUtils.inMiddle(cluster.app.name, 20), StatusUtils.inMiddle("" + cluster.taskNumber, 8),
+ StatusUtils.inMiddle("" + cluster.nodes.size(), 8));
boolean first = true;
for (ClusterNode node : cluster.nodes) {
if (first) {
@@ -180,68 +177,28 @@ public class Status extends S4ArgsBase {
} else {
System.out.format("%n%-60s", " ");
}
- System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
- inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+ System.out.format("%-10s%-50s%-10s", StatusUtils.inMiddle("" + node.getTaskId(), 10),
+ StatusUtils.inMiddle(node.getMachineName(), 50), StatusUtils.inMiddle(node.getPort() + "", 10));
}
System.out.println();
}
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
}
private static void showStreamsStatus(List<Stream> streams) {
System.out.println("Stream Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
- inMiddle("Consumers", 55));
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-20s%-55s%-55s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("Producers", 55),
+ StatusUtils.inMiddle("Consumers", 55));
+ System.out.println(StatusUtils.generateEdge(130));
for (Stream stream : streams) {
- System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
- inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
- inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static String inMiddle(String content, int width) {
- int i = (width - content.length()) / 2;
- return String.format("%" + i + "s%s", " ", content);
- }
-
- private static String generateEdge(int length) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++) {
- sb.append("-");
+ System.out.format("%-20s%-55s%-55s%n", StatusUtils.inMiddle(stream.streamName, 20),
+ StatusUtils.inMiddle(StatusUtils.getFormatString(stream.producers, stream.clusterAppMap), 55),
+ StatusUtils.inMiddle(StatusUtils.getFormatString(stream.consumers, stream.clusterAppMap), 55));
}
- return sb.toString();
- }
+ System.out.println(StatusUtils.generateEdge(130));
- /**
- * show as cluster1(app1), cluster2(app2)
- *
- * @param clusters
- * cluster list
- * @param clusterAppMap
- * <cluster,app>
- * @return
- */
- private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
- if (clusters == null || clusters.size() == 0) {
- return NONE;
- } else {
- // show as: cluster1(app1), cluster2(app2)
- StringBuilder sb = new StringBuilder();
- for (String cluster : clusters) {
- String app = clusterAppMap.get(cluster);
- sb.append(cluster);
- if (!NONE.equals(app)) {
- sb.append("(").append(app).append(")");
- }
- sb.append(" ");
- }
- return sb.toString();
- }
}
static class Stream {
@@ -302,14 +259,14 @@ public class Status extends S4ArgsBase {
+ "/app/s4App"));
return appConfig.getAppName();
}
- return NONE;
+ return StatusUtils.NONE;
}
}
static class App {
- private String name = NONE;
+ private String name = StatusUtils.NONE;
private String cluster;
- private String uri = NONE;
+ private String uri = StatusUtils.NONE;
}
static class Cluster {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java
new file mode 100644
index 0000000..ebfda15
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/StatusUtils.java
@@ -0,0 +1,65 @@
+package org.apache.s4.tools;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Strings;
+
+/**
+ * Some utility classes for formatting the output of Status tools
+ *
+ */
+public class StatusUtils {
+
+ /**
+ * show as cluster1(app1), cluster2(app2)
+ *
+ * @param clusters
+ * cluster list
+ * @param clusterAppMap
+ * <cluster,app>
+ * @return
+ */
+ public static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+ if (clusters == null || clusters.size() == 0) {
+ return StatusUtils.NONE;
+ } else {
+ // show as: cluster1(app1), cluster2(app2)
+ StringBuilder sb = new StringBuilder();
+ for (String cluster : clusters) {
+ String app = clusterAppMap.get(cluster);
+ sb.append(cluster);
+ if (!StatusUtils.NONE.equals(app)) {
+ sb.append("(").append(app).append(")");
+ }
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+ }
+
+ public static String title(String content, char highlighter, int width) {
+ return Strings.repeat(String.valueOf(highlighter), ((width - content.length()) / 2)) + content
+ + Strings.repeat(String.valueOf(highlighter), ((width - content.length()) / 2));
+ }
+
+ public static String noInfo(String content) {
+ return StatusUtils.inMiddle("---- " + content + " ----", 130) + "\n\n";
+ }
+
+ public static String inMiddle(String content, int width) {
+ int i = (width - content.length()) / 2;
+ return String.format("%" + i + "s%s", " ", content);
+ }
+
+ public static String generateEdge(int length) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ sb.append("-");
+ }
+ return sb.toString();
+ }
+
+ static String NONE = "--";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 7bbaeb2..b22e164 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -22,18 +22,17 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.s4.core.S4Node;
import org.apache.s4.tools.helix.AddNodes;
+import org.apache.s4.tools.helix.ClusterStatus;
import org.apache.s4.tools.helix.CreateCluster;
import org.apache.s4.tools.helix.CreateTask;
import org.apache.s4.tools.helix.DeployApp;
import org.apache.s4.tools.helix.GenericEventAdapter;
import org.apache.s4.tools.helix.RebalanceTask;
-import org.apache.s4.tools.helix.ClusterStatus;
import org.apache.s4.tools.helix.RemoveTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,14 +61,15 @@ public class Tools {
createTask(null,CreateTask.class),
removeTask(null,RemoveTask.class),
rebalanceTask(null,RebalanceTask.class);
- //formatter:on
-
- Class<?> zkTarget;
- private Class<?> helixTarget;
-
+ //formatter:on
+
+ private final Class<?> zkTarget;
+ private final Class<?> helixTarget;
+
Task(Class<?> target) {
- this(target,target);
+ this(target, target);
}
+
Task(Class<?> zkTarget, Class<?> helixTarget) {
this.zkTarget = zkTarget;
this.helixTarget = helixTarget;
@@ -77,12 +77,20 @@ public class Tools {
public void dispatch(String[] args) {
try {
- String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
Class<?> target = zkTarget;
- if("HELIX".equalsIgnoreCase(clusterManager)){
- target= helixTarget;
+ if (Sets.newHashSet(args).contains("-helix")) {
+ target = helixTarget;
+ if (target == null) {
+ logger.error("{} is not a Helix related task", this.name());
+ System.exit(1);
+ }
+ } else {
+ if (target == null && helixTarget != null) {
+ logger.error("{} is a Helix related task, please specify -helix", this.name());
+ System.exit(1);
+ }
}
-
+
Method main = target.getMethod("main", String[].class);
main.invoke(null, new Object[] { args });
@@ -91,9 +99,8 @@ public class Tools {
logger.error("Cannot dispatch to task [{}]: wrong arguments [{}]", this.name(), Arrays.toString(args));
}
}
-
}
-
+
public static void main(String[] args) {
// configure log4j for Zookeeper
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
index 136a4aa..3a598f7 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
@@ -21,8 +21,6 @@ package org.apache.s4.tools.helix;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.InstanceConfig;
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +44,11 @@ public class AddNodes {
if (clusterArgs.nbNodes > 0) {
String[] split = clusterArgs.nodes.split(",");
for (int i = 0; i < clusterArgs.nbNodes; i++) {
- InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
String host = "localhost";
if (split.length > 0 && split.length == clusterArgs.nbNodes) {
host = split[i].trim();
}
+ InstanceConfig instanceConfig = new InstanceConfig("node_" + host + "_" + initialPort);
instanceConfig.setHostName(host);
instanceConfig.setPort("" + initialPort);
instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
@@ -66,7 +64,7 @@ public class AddNodes {
}
@Parameters(commandNames = "s4 addNodes", separators = "=", commandDescription = "Setup new S4 logical cluster")
- static class ZKServerArgs extends S4ArgsBase {
+ static class ZKServerArgs extends HelixS4ArgsBase {
@Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
String clusterName = "s4-test-cluster";
@@ -85,6 +83,7 @@ public class AddNodes {
@Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
String nodeGroup = "default";
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
index 47ca98d..73d05c2 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/ClusterStatus.java
@@ -19,50 +19,34 @@
package org.apache.s4.tools.helix;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.StatusUtils;
import org.apache.s4.tools.Tools;
-import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-import com.google.common.collect.Maps;
public class ClusterStatus extends S4ArgsBase {
static Logger logger = LoggerFactory.getLogger(ClusterStatus.class);
- private static String NONE = "--";
-
public static void main(String[] args) {
StatusArgs statusArgs = new StatusArgs();
@@ -95,6 +79,10 @@ public class ClusterStatus extends S4ArgsBase {
statusArgs.apps = apps;
statusArgs.streams = tasks;
}
+
+ System.out.println();
+ System.out.println(StatusUtils.title(" App Status ", '*', 130));
+
for (String app : statusArgs.apps) {
if (resourcesInCluster.contains(app)) {
printAppInfo(manager, cluster, app);
@@ -124,23 +112,33 @@ public class ClusterStatus extends S4ArgsBase {
String streamName = configAccessor.get(scope, "streamName");
String taskType = configAccessor.get(scope, "taskType");
- System.out.println("Task Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
- inMiddle("Description", 90));
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-40s%-90s%n", inMiddle(taskId, 40), inMiddle(cluster, 20),
- inMiddle(streamName + " " + taskType, 90));
- System.out.println(generateEdge(130));
+ System.out.println();
+ System.out.println(StatusUtils.title(" Task Status ", '*', 130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Task Id", 20),
+ StatusUtils.inMiddle("Cluster", 20), StatusUtils.inMiddle("Description", 90));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(taskId, 20), StatusUtils.inMiddle(cluster, 20),
+ StatusUtils.inMiddle(streamName + " " + ((taskType == null) ? "(untyped)" : taskType), 90));
+ System.out.println(StatusUtils.generateEdge(130));
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+ if (view == null) {
+ System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR TASK [" + taskId + "]"));
+ return;
+ }
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
- System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
- System.out.println(generateEdge(130));
+ System.out.format("%-30s%-100s%n", StatusUtils.inMiddle("Partition", 30), StatusUtils.inMiddle("State", 100));
+ System.out.println(StatusUtils.generateEdge(130));
for (String partition : assignment.getPartitionSet()) {
Map<String, String> stateMap = view.getStateMap(partition);
+ if (stateMap == null) {
+ System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR TASK [" + taskId
+ + "] / PARTITION [" + partition + "]"));
+ return;
+ }
StringBuilder sb = new StringBuilder();
String delim = "";
for (String instance : stateMap.keySet()) {
@@ -153,9 +151,12 @@ public class ClusterStatus extends S4ArgsBase {
}
delim = ", ";
}
- System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
+ System.out.format("%-50s%-10s%n", StatusUtils.inMiddle(partition, 50),
+ StatusUtils.inMiddle(sb.toString(), 100));
}
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.println("\n\n");
+
}
private static void printAppInfo(HelixManager manager, String cluster, String app) {
@@ -164,28 +165,38 @@ public class ClusterStatus extends S4ArgsBase {
ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
String uri = configAccessor.get(scope, AppConfig.APP_URI);
- System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle("Name", 20), StatusUtils.inMiddle("Cluster", 20),
+ StatusUtils.inMiddle("URI", 90));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", StatusUtils.inMiddle(app, 20), StatusUtils.inMiddle(cluster, 20),
+ StatusUtils.inMiddle(uri, 90));
+ System.out.println(StatusUtils.generateEdge(130));
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+ if (view == null) {
+ System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR APP [" + app + "]"));
+ return;
+ }
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
Map<String, String> appStateMap = view.getStateMap(app);
- System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
- System.out.println(generateEdge(130));
+ if (appStateMap == null) {
+ System.out.println(StatusUtils.noInfo("INFORMATION NOT AVAILABLE FOR APP [" + app + "]"));
+ return;
+ }
+ System.out.format("%-50s%-20s%n", StatusUtils.inMiddle("Node id", 50), StatusUtils.inMiddle("DEPLOYED", 20));
+ System.out.println(StatusUtils.generateEdge(130));
for (String instance : assignmentMap.keySet()) {
String state = appStateMap.get(instance);
- System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
- inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
+ System.out.format("%-50s%-10s%n", StatusUtils.inMiddle(instance, 50), StatusUtils.inMiddle(
+ (("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
}
- System.out.println(generateEdge(130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.println("\n\n");
}
@@ -197,17 +208,18 @@ public class ClusterStatus extends S4ArgsBase {
if (liveInstances == null) {
liveInstances = Collections.emptyList();
}
- System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
- inMiddle("Active", 10), generateEdge(80));
- System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
- inMiddle("Port", 8), inMiddle("Active", 10));
- System.out.println(generateEdge(130));
-
- System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
- inMiddle("" + liveInstances.size(), 8));
+ System.out.println();
+ System.out.println(StatusUtils.title(" Cluster Status ", '*', 130));
+ System.out.println(StatusUtils.generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", StatusUtils.inMiddle("Nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", StatusUtils.inMiddle("Cluster Name", 20),
+ StatusUtils.inMiddle("Nodes", 20), StatusUtils.inMiddle("Active", 10), StatusUtils.generateEdge(80));
+ System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", StatusUtils.inMiddle("Node id", 10),
+ StatusUtils.inMiddle("Host", 50), StatusUtils.inMiddle("Port", 8), StatusUtils.inMiddle("Active", 10));
+ System.out.println(StatusUtils.generateEdge(130));
+
+ System.out.format("%-20s%-20s%-10s", StatusUtils.inMiddle(cluster, 20),
+ StatusUtils.inMiddle("" + instances.size(), 8), StatusUtils.inMiddle("" + liveInstances.size(), 8));
boolean first = true;
for (String instance : instances) {
@@ -218,16 +230,16 @@ public class ClusterStatus extends S4ArgsBase {
} else {
System.out.format("%n%-50s", " ");
}
- System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
- inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
- inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
+ System.out.format("%-10s%-46s%-10s%-10s", StatusUtils.inMiddle("" + config.getId(), 10),
+ StatusUtils.inMiddle(config.getHostName(), 50), StatusUtils.inMiddle(config.getPort() + "", 10),
+ StatusUtils.inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
}
- System.out.println();
+ System.out.println("\n\n");
}
@Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
- static class StatusArgs extends S4ArgsBase {
+ static class StatusArgs extends HelixS4ArgsBase {
@Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
List<String> apps;
@@ -245,226 +257,4 @@ public class ClusterStatus extends S4ArgsBase {
int timeout = 10000;
}
- private static void showAppsStatus(List<Cluster> clusters) {
- System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
- for (Cluster cluster : clusters) {
- if (!NONE.equals(cluster.app.name)) {
- System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
- inMiddle(cluster.app.cluster, 20), cluster.app.uri);
- }
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static void showClustersStatus(List<Cluster> clusters) {
- System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
- generateEdge(80));
- System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
- inMiddle("Host", 50), inMiddle("Port", 8));
- System.out.println(generateEdge(130));
-
- for (Cluster cluster : clusters) {
- System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
- inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
- inMiddle("" + cluster.nodes.size(), 8));
- boolean first = true;
- for (ClusterNode node : cluster.nodes) {
- if (first) {
- first = false;
- } else {
- System.out.format("%n%-60s", " ");
- }
- System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
- inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
- }
- System.out.println();
- }
- System.out.println(generateEdge(130));
- }
-
- private static void showStreamsStatus(List<Stream> streams) {
- System.out.println("Stream Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
- inMiddle("Consumers", 55));
- System.out.println(generateEdge(130));
-
- for (Stream stream : streams) {
- System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
- inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
- inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static String inMiddle(String content, int width) {
- int i = (width - content.length()) / 2;
- return String.format("%" + i + "s%s", " ", content);
- }
-
- private static String generateEdge(int length) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++) {
- sb.append("-");
- }
- return sb.toString();
- }
-
- /**
- * show as cluster1(app1), cluster2(app2)
- *
- * @param clusters
- * cluster list
- * @param clusterAppMap
- * <cluster,app>
- * @return
- */
- private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
- if (clusters == null || clusters.size() == 0) {
- return NONE;
- } else {
- // show as: cluster1(app1), cluster2(app2)
- StringBuilder sb = new StringBuilder();
- for (String cluster : clusters) {
- String app = clusterAppMap.get(cluster);
- sb.append(cluster);
- if (!NONE.equals(app)) {
- sb.append("(").append(app).append(")");
- }
- sb.append(" ");
- }
- return sb.toString();
- }
- }
-
- static class Stream {
-
- private final ZkClient zkClient;
- private final String consumerPath;
- private final String producerPath;
-
- String streamName;
- Set<String> producers = new HashSet<String>();// cluster name
- Set<String> consumers = new HashSet<String>();// cluster name
-
- Map<String, String> clusterAppMap = Maps.newHashMap();
-
- public Stream(String streamName, ZkClient zkClient) throws Exception {
- this.streamName = streamName;
- this.zkClient = zkClient;
- this.consumerPath = "/s4/streams/" + streamName + "/consumers";
- this.producerPath = "/s4/streams/" + streamName + "/producers";
- readStreamFromZk();
- }
-
- private void readStreamFromZk() throws Exception {
- List<String> consumerNodes = zkClient.getChildren(consumerPath);
- for (String node : consumerNodes) {
- ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
- consumers.add(consumer.getSimpleField("clusterName"));
- }
-
- List<String> producerNodes = zkClient.getChildren(producerPath);
- for (String node : producerNodes) {
- ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
- producers.add(consumer.getSimpleField("clusterName"));
- }
-
- getAppNames();
- }
-
- private void getAppNames() {
- Set<String> clusters = new HashSet<String>(consumers);
- clusters.addAll(producers);
- for (String cluster : clusters) {
- clusterAppMap.put(cluster, getApp(cluster, zkClient));
- }
- }
-
- public boolean containsCluster(String cluster) {
- if (producers.contains(cluster) || consumers.contains(cluster)) {
- return true;
- }
- return false;
- }
-
- private static String getApp(String clusterName, ZkClient zkClient) {
- String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
- if (zkClient.exists(appPath)) {
- ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
- return appRecord.getSimpleField("name");
- }
- return NONE;
- }
- }
-
- static class App {
- private String name = NONE;
- private String cluster;
- private String uri = NONE;
- }
-
- static class Cluster {
- private final ZkClient zkClient;
- private final String taskPath;
- private final String processPath;
- private final String appPath;
-
- String clusterName;
- int taskNumber;
- App app;
-
- List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-
- public Cluster(String clusterName, ZkClient zkClient) throws Exception {
- this.clusterName = clusterName;
- this.zkClient = zkClient;
- this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
- this.processPath = "/s4/clusters/" + clusterName + "/process";
- this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
- readClusterFromZk();
- }
-
- public void readClusterFromZk() throws Exception {
- List<String> processes;
- List<String> tasks;
-
- tasks = zkClient.getChildren(taskPath);
- processes = zkClient.getChildren(processPath);
-
- taskNumber = tasks.size();
-
- for (int i = 0; i < processes.size(); i++) {
- ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
- if (process != null) {
- int partition = Integer.parseInt(process.getSimpleField("partition"));
- String host = process.getSimpleField("host");
- int port = Integer.parseInt(process.getSimpleField("port"));
- String taskId = process.getSimpleField("taskId");
- ClusterNode node = new ClusterNode(partition, port, host, taskId);
- nodes.add(node);
- }
- }
-
- app = new App();
- app.cluster = clusterName;
- try {
- ZNRecord appRecord = zkClient.readData(appPath);
- app.name = appRecord.getSimpleField("name");
- app.uri = appRecord.getSimpleField("s4r_uri");
- } catch (ZkNoNodeException e) {
- logger.warn(appPath + " doesn't exist");
- }
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
index 89361a3..f71d221 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateCluster.java
@@ -22,8 +22,6 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +72,7 @@ public class CreateCluster {
}
@Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
- static class ZKServerArgs extends S4ArgsBase {
+ static class ZKServerArgs extends HelixS4ArgsBase {
@Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
String clusterName = "s4-test-cluster";
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
index d6074da..428cddc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/CreateTask.java
@@ -1,7 +1,6 @@
package org.apache.s4.tools.helix;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -10,9 +9,8 @@ import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
import org.slf4j.Logger;
@@ -29,8 +27,7 @@ public class CreateTask extends S4ArgsBase {
CreateTaskArgs taskArgs = new CreateTaskArgs();
Tools.parseArgs(taskArgs, args);
- String msg = String.format(
- "Setting up new pe [{}] for stream(s) on nodes belonging to node group {}",
+ String msg = String.format("Setting up new pe [{}] for stream(s) on nodes belonging to node group {}",
taskArgs.taskId, taskArgs.streamName, taskArgs.nodeGroup);
logger.info(msg);
HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
@@ -56,12 +53,12 @@ public class CreateTask extends S4ArgsBase {
instancesInGroup.add(instanceName);
}
}
- admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1,instancesInGroup);
+ admin.rebalance(taskArgs.clusterName, taskArgs.taskId, taskArgs.numStandBys + 1, instancesInGroup);
logger.info("Finished setting up task:" + taskArgs.taskId + " on nodes " + instancesInGroup);
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class CreateTaskArgs extends S4ArgsBase {
+ static class CreateTaskArgs extends HelixS4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index 693e13d..4910bb1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -67,8 +67,13 @@ public class DeployApp extends S4ArgsBase {
HelixBasedCoreModule.class.getName());
// TODO merge with custom modules
- AppConfig appConfig = new AppConfig.Builder().appClassName(deployArgs.appClass).appName(deployArgs.appName)
- .appURI(deployArgs.s4rPath).customModulesNames(helixModules).customModulesURIs(null)
+ AppConfig appConfig = new AppConfig.Builder()
+ .appClassName(deployArgs.appClass)
+ .appName(deployArgs.appName)
+ .appURI(deployArgs.s4rPath)
+ .customModulesNames(
+ new ImmutableList.Builder<String>().addAll(helixModules).addAll(deployArgs.modulesClassesNames)
+ .build()).customModulesURIs(deployArgs.modulesURIs)
.namedParameters(Deploy.convertListArgsToMap(deployArgs.extraNamedParameters)).build();
// properties.put("appConfig", appConfig.asMap());
// properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
@@ -100,7 +105,7 @@ public class DeployApp extends S4ArgsBase {
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class DeployAppArgs extends S4ArgsBase {
+ static class DeployAppArgs extends HelixS4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
@@ -126,5 +131,11 @@ public class DeployApp extends S4ArgsBase {
@Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
List<String> extraNamedParameters = new ArrayList<String>();
+ @Parameter(names = { "-modulesURIs", "-mu" }, description = "URIs for fetching code of custom modules")
+ List<String> modulesURIs = new ArrayList<String>();
+
+ @Parameter(names = { "-modulesClasses", "-emc", "-mc" }, description = "Fully qualified class names of custom modules")
+ List<String> modulesClassesNames = new ArrayList<String>();
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index bb86312..6d62a1e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -11,7 +11,6 @@ import org.apache.s4.base.Event;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ClusterFromHelix;
-import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
import com.beust.jcommander.Parameter;
@@ -30,7 +29,7 @@ public class GenericEventAdapter {
ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
30, 60);
manager.connect();
- manager.addExternalViewChangeListener(cluster);
+ manager.addExternalViewChangeListener(cluster);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -43,9 +42,10 @@ public class GenericEventAdapter {
event.setStreamId("names");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
KryoSerDeser serializer = new KryoSerDeser(classLoader);
-// EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
+ // EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
System.out.println("Sending event to partition:" + partitionId);
- Destination destination = cluster.getDestination(adapterArgs.streamName, partitionId, emitter.getType());
+ Destination destination = cluster
+ .getDestination(adapterArgs.streamName, partitionId, emitter.getType());
emitter.send(destination, serializer.serialize(event));
Thread.sleep(1000);
}
@@ -56,7 +56,7 @@ public class GenericEventAdapter {
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class AdapterArgs extends S4ArgsBase {
+ static class AdapterArgs extends HelixS4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java
new file mode 100644
index 0000000..51e3cd3
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/HelixS4ArgsBase.java
@@ -0,0 +1,16 @@
+package org.apache.s4.tools.helix;
+
+import org.apache.s4.tools.S4ArgsBase;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Base class for args for Helix related S4 tools
+ *
+ */
+public abstract class HelixS4ArgsBase extends S4ArgsBase {
+
+ @Parameter(names = { "-helix" }, description = "Use Helix - required for Helix-related S4 commands", required = true, hidden = true, arity = 0)
+ boolean helix;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
index e4f4526..c0aff7a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RebalanceTask.java
@@ -1,18 +1,12 @@
package org.apache.s4.tools.helix;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.helix.ConfigScope;
-import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.IdealState.IdealStateModeProperty;
import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
@@ -43,7 +37,7 @@ public class RebalanceTask extends S4ArgsBase {
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class RebalanceTaskArgs extends S4ArgsBase {
+ static class RebalanceTaskArgs extends HelixS4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0b93cae4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
index 044e133..1ca06cc 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/RemoveTask.java
@@ -1,19 +1,9 @@
package org.apache.s4.tools.helix;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.ConfigScope;
-import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.IdealState.IdealStateModeProperty;
import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
-import org.apache.s4.tools.helix.CreateTask.CreateTaskArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,25 +11,22 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
public class RemoveTask extends S4ArgsBase {
-
+
static Logger logger = LoggerFactory.getLogger(CreateTask.class);
public static void main(String[] args) {
RemoveTaskArgs taskArgs = new RemoveTaskArgs();
Tools.parseArgs(taskArgs, args);
- String msg = String.format(
- "Removing task [{}] from cluster [{}]",
- taskArgs.taskId, taskArgs.clusterName);
+ String msg = String.format("Removing task [{}] from cluster [{}]", taskArgs.taskId, taskArgs.clusterName);
logger.info(msg);
HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
admin.dropResource(taskArgs.clusterName, taskArgs.taskId);
- logger.info("Finished Removing task:" + taskArgs.taskId + " from cluster:"+ taskArgs.clusterName);
+ logger.info("Finished Removing task:" + taskArgs.taskId + " from cluster:" + taskArgs.clusterName);
}
-
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class RemoveTaskArgs extends S4ArgsBase {
+ static class RemoveTaskArgs extends HelixS4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";