You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[8/22] git commit: improved scripts + added adapter facilities -
added a simple runnable project template - added a facility to start an
adapter node directly from the current project - added some documentation -
improved error handling in tcp emitter
improved scripts + added adapter facilities
- added a simple runnable project template
- added a facility to start an adapter node directly from the current
project
- added some documentation
- improved error handling in tcp emitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cad14b40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cad14b40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cad14b40
Branch: refs/heads/piper
Commit: cad14b4035c3a076c614e1f995d2d352ffcf0569
Parents: 99b6f04
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu May 24 20:10:00 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu May 24 20:10:00 2012 +0200
----------------------------------------------------------------------
build.gradle | 2 +-
s4 | 36 +--
.../java/org/apache/s4/comm/DefaultCommModule.java | 129 +++++++++
.../org/apache/s4/comm/RemoteEmitterFactory.java | 10 +
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 32 ++-
.../java/org/apache/s4/comm/tools/TaskSetup.java | 4 +
.../org/apache/s4/comm/topology/RemoteStreams.java | 8 +
.../apache/s4/comm/topology/StreamConsumer.java | 28 ++
.../org/apache/s4/comm/udp/UDPRemoteEmitter.java | 18 ++
.../src/main/resources/default.s4.comm.properties | 8 +
.../s4-comm/src/main/resources/s4-comm.properties | 9 -
.../java/org/apache/s4/comm/tcp/TCPCommTest.java | 14 +-
.../java/org/apache/s4/comm/udp/UDPCommTest.java | 12 +-
.../ZkBasedClusterManagementTestModule.java | 101 -------
.../src/test/resources/default.s4.properties | 6 -
.../src/test/resources/udp.s4.comm.properties | 9 +
.../src/main/java/org/apache/s4/core/App.java | 49 +----
.../java/org/apache/s4/core/DefaultCoreModule.java | 76 ++++++
.../java/org/apache/s4/core/DefaultModule.java | 99 -------
.../src/main/java/org/apache/s4/core/Main.java | 210 ++++++++-------
.../src/main/java/org/apache/s4/core/Receiver.java | 1 +
.../java/org/apache/s4/core/RemoteSenders.java | 9 +-
.../src/main/java/org/apache/s4/core/Server.java | 18 --
.../org/apache/s4/core/adapter/AdapterApp.java | 52 ++++
.../s4/core/util/ParametersInjectionModule.java | 28 ++
.../src/main/resources/apps/CounterExample.s4r | Bin 53071 -> 0 bytes
.../src/main/resources/default.s4.core.properties | 2 +
.../s4-core/src/main/resources/s4-core.properties | 5 -
.../test/java/org/apache/s4/core/TriggerTest.java | 6 +-
.../apache/s4/deploy/TestAutomaticDeployment.java | 71 ++----
.../s4/deploy/prodcon/TestProducerConsumer.java | 28 +--
.../org/apache/s4/fixtures/ZkBasedAppModule.java | 40 ---
.../org/apache/s4/wordcount/WordCountModule.java | 10 +-
.../org/apache/s4/wordcount/WordCountTest.java | 13 +-
.../src/test/resources/default.s4.properties | 12 -
.../java/org/apache/s4/example/model/Main.java | 18 +-
subprojects/s4-tools/s4-tools.gradle | 8 +
.../main/java/org/apache/s4/tools/CreateApp.java | 139 ++++++++++
.../java/org/apache/s4/tools/DefineCluster.java | 4 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 17 +-
.../main/java/org/apache/s4/tools/S4ArgsBase.java | 3 +
.../src/main/java/org/apache/s4/tools/Tools.java | 86 +++++-
.../src/main/resources/templates/HelloApp.java.txt | 34 +++
.../resources/templates/HelloInputAdapter.java.txt | 60 ++++
.../src/main/resources/templates/HelloPE.java.txt | 29 ++
.../src/main/resources/templates/build.gradle | 168 ++++++++++++
.../s4-tools/src/main/resources/templates/gradlew | 2 +
.../src/main/resources/templates/newApp.README | 37 +++
.../s4-tools/src/main/resources/templates/s4 | 17 ++
test-apps/twitter-adapter/build.gradle | 5 +
.../s4/example/twitter/TwitterInputAdapter.java | 15 +-
test-apps/twitter-counter/build.gradle | 6 +-
.../org/apache/s4/example/twitter/TopNTopicPE.java | 23 ++-
53 files changed, 1193 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 58ff560..b6140ed 100644
--- a/build.gradle
+++ b/build.gradle
@@ -72,7 +72,7 @@ libraries = [
junit: 'junit:junit:4.10',
zkclient: 'com.github.sgroschupf:zkclient:0.1',
diezel: 'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
- jcommander: 'com.beust:jcommander:1.23',
+ jcommander: 'com.beust:jcommander:1.25',
commons_io: 'commons-io:commons-io:2.1',
gradle_base_services: 'gradle-base-services:gradle-base-services:1.0-rc-3',
gradle_core: 'gradle-core:gradle-core:1.0-rc-3',
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/s4
----------------------------------------------------------------------
diff --git a/s4 b/s4
index 56745a2..61517a1 100755
--- a/s4
+++ b/s4
@@ -4,29 +4,10 @@
GRADLE=`pwd`/gradlew
-case "$1" in
-"deploy")
- shift
- subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.Deploy -gradle=$GRADLE $@
-;;
-"zkServer")
- shift
- subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.ZKServer $@
-;;
-"newCluster")
- shift
- subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.DefineCluster $@
-;;
-"appNode")
- shift
- subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.Main $@
-;;
-"adapterNode")
- shift
- subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.adapter.AdapterMain $@
-;;
-
-esac
+S4_DIR="$( cd -P "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+S4_SCRIPT_PATH="$S4_DIR/s4"
+
+subprojects/s4-tools/build/install/s4-tools/bin/s4-tools -s4ScriptPath=$S4_SCRIPT_PATH $@
# EXAMPLES
# deploy:
@@ -35,11 +16,8 @@ esac
# newCluster:
# # examples:
-#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000 -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=13000 -nbTasks=1
-
-# appNode:
-# ./s4 appNode subprojects/s4-core/src/test/resources/default.s4.properties
+#./s4 newCluster -c=s4-test-cluster -flp=11000 -nbTasks=2 ; ./s4 newCluster -c=s4-adapter-cluster -flp=13000 -nbTasks=1
-# adapterNode:
-# ./s4 adapterNode -s4Properties=test-apps/twitter-adapter/src/main/resources/s4.properties
+# node:
+# ./s4 node -c=s4-test-cluster
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
new file mode 100644
index 0000000..df38071
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -0,0 +1,129 @@
+package org.apache.s4.comm;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+
+/**
+ * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ *
+ */
+public class DefaultCommModule extends AbstractModule {
+
+ private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
+ InputStream commConfigInputStream;
+ private PropertiesConfiguration config;
+ String clusterName;
+
+ /**
+ *
+ * @param commConfigInputStream
+ * input stream from a configuration file
+ * @param clusterName
+ * the name of the cluster to which the current node belongs. If specified in the configuration file,
+ * this parameter will be ignored.
+ */
+ public DefaultCommModule(InputStream commConfigInputStream, String clusterName) {
+ super();
+ this.commConfigInputStream = commConfigInputStream;
+ this.clusterName = clusterName;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ if (commConfigInputStream != null) {
+ try {
+ commConfigInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ bind(Assignment.class).to(AssignmentFromZK.class);
+ bind(Clusters.class).to(ClustersFromZK.class);
+ bind(Cluster.class).to(ClusterFromZK.class);
+
+ try {
+ Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
+ .getString("comm.emitter.class"));
+ bind(Emitter.class).to(emitterClass);
+
+ // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+ Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
+ .getString("comm.emitter.remote.class"));
+ install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+ RemoteEmitterFactory.class));
+ bind(RemoteEmitters.class);
+
+ bind(Listener.class).to((Class<? extends Listener>) Class.forName(config.getString("comm.listener.class")));
+
+ } catch (ClassNotFoundException e) {
+ logger.error("Cannot find class implementation ", e);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(commConfigInputStream);
+
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+ if (clusterName != null) {
+ if (config.containsKey("cluster.name")) {
+ logger.warn(
+ "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
+ clusterName, config.getProperty("cluster.name"));
+ } else {
+ Names.bindProperties(binder, new HashMap<String, String>() {
+ {
+ put("cluster.name", clusterName);
+ }
+ });
+ }
+ }
+
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
new file mode 100644
index 0000000..3da803d
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
@@ -0,0 +1,10 @@
+package org.apache.s4.comm;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.topology.Cluster;
+
+public interface RemoteEmitterFactory {
+
+ RemoteEmitter createRemoteEmitter(Cluster topology);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 623172a..7bff4af 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -1,6 +1,5 @@
package org.apache.s4.comm.tcp;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Hashtable;
@@ -10,9 +9,9 @@ import java.util.concurrent.Executors;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterChangeListener;
+import org.apache.s4.comm.topology.ClusterNode;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -198,6 +197,8 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChange
// the handler is correctly responding to disconnections/reconnections 3/ there should be a lock per channel,
// no?
+ // NOTE: might be fixed in S4-7
+
// */
// if (!channel.isWritable()) {
// synchronized (sendLock) {
@@ -241,7 +242,7 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChange
}
if (f.isCancelled()) {
- logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
+ logger.error("Send I/O was cancelled! " + f.getChannel().getRemoteAddress());
} else if (!f.isSuccess()) {
logger.error("Exception on I/O operation", f.getCause());
logger.error(String.format("I/O on partition %d failed!", partitionId));
@@ -291,22 +292,23 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChange
@Override
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
+ String target;
if (partitionId == null) {
- logger.error("Error on mystery channel!!");
+ target = "unknown channel";
+ } else {
+ target = "channel for partition [" + partitionId + "], target node host ["
+ + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
+ + partitionNodeMap.get(partitionId).getPort() + "]";
}
- logger.error("Error on channel to partition " + partitionId);
+ logger.error(
+ "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
+ target);
- try {
- throw event.getCause();
- } catch (ConnectException ce) {
- logger.error(ce.getMessage(), ce);
- } catch (Throwable err) {
- logger.error("Error", err);
- if (context.getChannel().isOpen()) {
- logger.error("Closing channel due to exception");
- context.getChannel().close();
- }
+ if (context.getChannel().isOpen()) {
+ logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
+ context.getChannel().close();
}
+
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 0458c70..aadcd4d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -48,4 +48,8 @@ public class TaskSetup {
}
}
+ public void disconnect() {
+ zkclient.close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 6b5d094..dd93682 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -188,6 +188,13 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener {
zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
}
+ /**
+ * Publishes interest in a stream from an application.
+ *
+ * @param appId
+ * @param clusterName
+ * @param streamName
+ */
public void addInputStream(int appId, String clusterName, String streamName) {
lock.lock();
try {
@@ -198,6 +205,7 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener {
consumer.putSimpleField("appId", String.valueOf(appId));
consumer.putSimpleField("clusterName", clusterName);
try {
+ // NOTE: We create 1 sequential znode per consumer node instance
zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
} catch (Throwable e) {
logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
index f21213a..ede16a3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -24,4 +24,32 @@ public class StreamConsumer {
return clusterName;
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + appId;
+ result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ StreamConsumer other = (StreamConsumer) obj;
+ if (appId != other.appId)
+ return false;
+ if (clusterName == null) {
+ if (other.clusterName != null)
+ return false;
+ } else if (!clusterName.equals(other.clusterName))
+ return false;
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
new file mode 100644
index 0000000..4fa9aa0
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -0,0 +1,18 @@
+package org.apache.s4.comm.udp;
+
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class UDPRemoteEmitter extends UDPEmitter {
+
+ /**
+ * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
+ * discovered (as remote streams outputs)
+ */
+ @Inject
+ public UDPRemoteEmitter(@Assisted Cluster topology) throws InterruptedException {
+ super(topology);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
new file mode 100644
index 0000000..07497b2
--- /dev/null
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -0,0 +1,8 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
+comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
+comm.listener.class=org.apache.s4.comm.tcp.TCPListener
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/resources/s4-comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/s4-comm.properties b/subprojects/s4-comm/src/main/resources/s4-comm.properties
deleted file mode 100644
index aa0c843..0000000
--- a/subprojects/s4-comm/src/main/resources/s4-comm.properties
+++ /dev/null
@@ -1,9 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = /tmp
-cluster.isCluster = true
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index cb39c1b..268715b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -2,35 +2,39 @@ package org.apache.s4.comm.tcp;
import java.io.IOException;
+import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.name.Names;
public class TCPCommTest extends ZkBasedTest {
DeliveryTestUtil util;
+ public final static String CLUSTER_NAME = "cluster1";
@Before
public void setup() throws IOException, InterruptedException, KeeperException {
- Injector injector = Guice.createInjector(new TCPCommTestModule());
+ Injector injector = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
+ new TCPCommTestModule());
util = injector.getInstance(DeliveryTestUtil.class);
}
- class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
+ class TCPCommTestModule extends AbstractModule {
TCPCommTestModule() {
- super(TCPEmitter.class, TCPRemoteEmitter.class, TCPListener.class);
+
}
@Override
protected void configure() {
- super.configure();
bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index e224b99..8f01a9b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -2,14 +2,17 @@ package org.apache.s4.comm.udp;
import java.io.IOException;
+import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.comm.tcp.TCPCommTest;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.name.Names;
@@ -19,18 +22,17 @@ public class UDPCommTest extends ZkBasedTest {
@Before
public void setup() throws IOException, InterruptedException, KeeperException {
- Injector injector = Guice.createInjector(new UDPCommTestModule());
+ Injector injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
+ .openStream(), TCPCommTest.CLUSTER_NAME), new UDPCommTestModule());
util = injector.getInstance(DeliveryTestUtil.class);
}
- class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
+ class UDPCommTestModule extends AbstractModule {
UDPCommTestModule() {
- super(UDPEmitter.class, null, UDPListener.class);
}
@Override
protected void configure() {
- super.configure();
bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
deleted file mode 100644
index a1e7089..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.RemoteEmitterFactory;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.tcp.TCPRemoteEmitter;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromZK;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.ClustersFromZK;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-public class ZkBasedClusterManagementTestModule extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private Class<? extends Emitter> emitterClass = null;
- private Class<? extends RemoteEmitter> remoteEmitterClass = null;
- private Class<? extends Listener> listenerClass = null;
-
- protected ZkBasedClusterManagementTestModule() {
- }
-
- protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
- Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
- this.emitterClass = emitterClass;
- this.remoteEmitterClass = remoteEmitterClass;
- this.listenerClass = listenerClass;
- }
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
- config = new PropertiesConfiguration();
- config.load(is);
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- bind(Hasher.class).to(DefaultHasher.class);
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(Assignment.class).to(AssignmentFromZK.class);
- bind(Clusters.class).to(ClustersFromZK.class);
- bind(Cluster.class).to(ClusterFromZK.class);
-
- // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
- if (this.remoteEmitterClass != null) {
- install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
- RemoteEmitterFactory.class));
- } else {
- install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
- RemoteEmitterFactory.class));
- }
-
- bind(RemoteEmitters.class);
-
- if (this.emitterClass != null) {
- bind(Emitter.class).to(this.emitterClass);
- } else {
- bind(Emitter.class).to(TCPEmitter.class);
- }
-
- if (this.listenerClass != null) {
- bind(Listener.class).to(this.listenerClass);
- } else {
- bind(Listener.class).to(TCPListener.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/default.s4.properties b/subprojects/s4-comm/src/test/resources/default.s4.properties
deleted file mode 100644
index 6624ff4..0000000
--- a/subprojects/s4-comm/src/test/resources/default.s4.properties
+++ /dev/null
@@ -1,6 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.name = cluster1
-cluster.zk_address = localhost:2181
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
new file mode 100644
index 0000000..615f114
--- /dev/null
+++ b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
@@ -0,0 +1,9 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+comm.emitter.class = org.apache.s4.comm.udp.UDPEmitter
+comm.emitter.remote.class = org.apache.s4.comm.udp.UDPRemoteEmitter
+comm.listener.class = org.apache.s4.comm.udp.UDPListener
+cluster.name = cluster1
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index c6bffd3..935d4a5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -16,7 +16,6 @@
package org.apache.s4.core;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -26,13 +25,11 @@ import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.App.ClockType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
import com.google.inject.Inject;
-import com.google.inject.Injector;
import com.google.inject.name.Named;
/*
@@ -384,48 +381,4 @@ public abstract class App {
return stream != null ? stream.getName() + " " : "null ";
}
- /**
- * Facility for starting S4 apps by passing a module class and an application class
- *
- * Usage: java <classpath+params> org.apache.s4.core.App <appClassName> <moduleClassName>
- *
- */
- public static void main(String[] args) {
- if (args.length != 2) {
- usage(args);
- }
- logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
- Injector injector = null;
- try {
- if (!AbstractModule.class.isAssignableFrom(Class.forName(args[0]))) {
- logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
- System.exit(-1);
- }
- injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).newInstance());
- } catch (InstantiationException e) {
- logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
- System.exit(-1);
- } catch (IllegalAccessException e) {
- logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
- System.exit(-1);
- } catch (ClassNotFoundException e) {
- logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
- System.exit(-1);
- }
- App app;
- try {
- app = (App) injector.getInstance(Class.forName(args[1]));
- app.init();
- app.start();
- } catch (ClassNotFoundException e) {
- logger.error("Invalid S4 application class [{}] : {}", args[0], e.getMessage());
- }
- }
-
- private static void usage(String[] args) {
- logger.info("Invalid parameters " + Arrays.toString(args)
- + " \nUsage: java <classpath+params> org.apache.s4.core.App <appClassName> <moduleClassName>");
- System.exit(-1);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
new file mode 100644
index 0000000..4baf10f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -0,0 +1,76 @@
+package org.apache.s4.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ *
+ */
+public class DefaultCoreModule extends AbstractModule {
+
+ private static Logger logger = LoggerFactory.getLogger(DefaultCoreModule.class);
+
+ InputStream coreConfigFileInputStream;
+ private PropertiesConfiguration config;
+
+ String clusterName = null;
+
+ public DefaultCoreModule(InputStream coreConfigFileInputStream) {
+ this.coreConfigFileInputStream = coreConfigFileInputStream;
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ if (coreConfigFileInputStream != null) {
+ try {
+ coreConfigFileInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+ }
+
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(coreConfigFileInputStream);
+
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
deleted file mode 100644
index 9003f26..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.s4.core;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.RemoteEmitterFactory;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.tcp.TCPRemoteEmitter;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromZK;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.ClustersFromZK;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-/**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
- *
- */
-public class DefaultModule extends AbstractModule {
-
- InputStream configFileInputStream;
- private PropertiesConfiguration config;
-
- public DefaultModule(InputStream configFileInputStream) {
- this.configFileInputStream = configFileInputStream;
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (configFileInputStream != null) {
- try {
- configFileInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- bind(Emitter.class).to(TCPEmitter.class);
-
- bind(Listener.class).to(TCPListener.class);
-
- bind(Assignment.class).to(AssignmentFromZK.class);
-
- bind(Clusters.class).to(ClustersFromZK.class);
- bind(Cluster.class).to(ClusterFromZK.class);
-
- // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
- install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
- RemoteEmitterFactory.class));
- bind(RemoteEmitters.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
- }
-
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(configFileInputStream);
-
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 09a3778..663caed 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -2,14 +2,21 @@ package org.apache.s4.core;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.InputStream;
-import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.util.ParametersInjectionModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
import com.google.common.io.Resources;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -29,121 +36,126 @@ public class Main {
* @param args
*/
public static void main(String[] args) {
- try {
- if (args.length == 0) {
- logger.info("Starting S4 node with default configuration");
- startDefaultS4Node();
- } else if (args.length == 1) {
- logger.info("Starting S4 node with custom configuration from file {}", args[0]);
- startCustomS4Node(args[0]);
- } else {
- logger.info("Starting S4 node in development mode");
- startDevelopmentMode(args);
- }
- } catch (Exception e) {
- logger.error("Cannot start S4 node", e);
- }
- }
- private static void startCustomS4Node(String s4PropertiesFilePath) throws FileNotFoundException {
- // TODO that's quite inconvenient anyway: we still need to specify the comm module in the config
- // file passed as a parameter...
- Injector injector = Guice
- .createInjector(new DefaultModule(new FileInputStream(new File(s4PropertiesFilePath))));
- startServer(logger, injector);
- }
-
- private static void startDefaultS4Node() {
- final Logger logger = LoggerFactory.getLogger(Main.class);
-
- /*
- * Need to get name of plugin module. Load ControllerModule to get configuration.
- */
- Injector injector = Guice.createInjector(new org.apache.s4.core.Module());
+ MainArgs mainArgs = new MainArgs();
+ JCommander jc = new JCommander(mainArgs);
- startServer(logger, injector);
- }
-
- private static void startServer(final Logger logger, Injector injector) {
- Server server = injector.getInstance(Server.class);
try {
- server.start(injector);
+ jc.parse(args);
} catch (Exception e) {
- logger.error("Failed to start the controller.", e);
+ JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+ jc.usage();
+ System.exit(1);
}
+
+ startNode(mainArgs);
}
- /**
- * Facility for starting S4 apps by passing a module class and an application class
- *
- * Usage: java <classpath+params> org.apache.s4.core.Main <appClassName> <moduleClassName>
- *
- */
- private static void startDevelopmentMode(String[] args) {
- if (args.length < 2 && args.length > 3) {
- usageForDevelopmentMode(args);
- }
- logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
- Injector injector = null;
+ private static void startNode(MainArgs mainArgs) {
try {
- if (!AbstractModule.class.isAssignableFrom(Class.forName(args[0]))) {
- logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
- System.exit(-1);
+ Injector injector;
+ InputStream commConfigFileInputStream;
+ InputStream coreConfigFileInputStream;
+ String commConfigString;
+ if (mainArgs.commConfigFilePath == null) {
+ commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+ commConfigString = "default.s4.comm.properties from classpath";
+ } else {
+ commConfigFileInputStream = new FileInputStream(new File(mainArgs.commConfigFilePath));
+ commConfigString = mainArgs.commConfigFilePath;
}
- if (args.length == 3) {
- if (!(new File(args[2]).exists())) {
- logger.error("Cannot find S4 config file {}", args[2]);
- System.exit(-1);
- }
- try {
- injector = Guice.createInjector((AbstractModule) Class.forName(args[0])
- .getConstructor(InputStream.class).newInstance(new FileInputStream(new File(args[2]))));
- } catch (Exception e) {
- logger.error("Module loading error", e);
- System.exit(-1);
- }
+
+ String coreConfigString;
+ if (mainArgs.coreConfigFilePath == null) {
+ coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+ coreConfigString = "default.s4.core.properties from classpath";
} else {
- URL defaultS4Config = null;
- try {
- defaultS4Config = Resources.getResource("default.s4.properties");
- } catch (IllegalArgumentException e) {
- logger.error(
- "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
- e);
- System.exit(-1);
+ coreConfigFileInputStream = new FileInputStream(new File(mainArgs.coreConfigFilePath));
+ coreConfigString = mainArgs.coreConfigFilePath;
+ }
+
+ AbstractModule commModule = (AbstractModule) Class.forName(mainArgs.commModuleClass)
+ .getConstructor(InputStream.class, String.class)
+ .newInstance(commConfigFileInputStream, mainArgs.clusterName);
+ AbstractModule coreModule = (AbstractModule) Class.forName(mainArgs.coreModuleClass)
+ .getConstructor(InputStream.class).newInstance(coreConfigFileInputStream);
+
+ List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+ for (String moduleClass : mainArgs.extraModulesClasses) {
+ extraModules.add((com.google.inject.Module) Class.forName(moduleClass).newInstance());
+ }
+
+ List<com.google.inject.Module> modules = new ArrayList<com.google.inject.Module>();
+ modules.add((com.google.inject.Module) commModule);
+ modules.add((com.google.inject.Module) coreModule);
+ modules.addAll(extraModules);
+
+ logger.info(
+ "Initializing S4 node with : \n- comm module class [{}]\n- comm configuration file [{}]\n- core module class [{}]\n- core configuration file[{}]\n-extra modules: "
+ + Arrays.toString(mainArgs.extraModulesClasses.toArray(new String[] {})), new String[] {
+ mainArgs.commModuleClass, commConfigString, mainArgs.coreModuleClass, coreConfigString });
+
+ if (!mainArgs.extraNamedParameters.isEmpty()) {
+ logger.debug("Adding named parameters for injection : {}",
+ Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})));
+ Map<String, String> namedParameters = new HashMap<String, String>();
+ for (String namedParam : mainArgs.extraNamedParameters) {
+ namedParameters.put(namedParam.split("[:]")[0].trim(), namedParam.split("[:]")[1].trim());
}
+ modules.add(new ParametersInjectionModule(namedParameters));
+ }
+
+ injector = Guice.createInjector(modules);
+ if (mainArgs.appClass != null) {
+ logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
+ App app = (App) injector.getInstance(Class.forName(mainArgs.appClass));
+ app.init();
+ app.start();
+ } else {
+ logger.info("Starting S4 node. This node will automatically download applications published for the cluster it belongs to");
+ Server server = injector.getInstance(Server.class);
try {
- injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).getConstructor(File.class)
- .newInstance(Resources.newInputStreamSupplier(defaultS4Config).getInput()));
+ server.start(injector);
} catch (Exception e) {
- logger.error(
- "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
- e);
- System.exit(-1);
+ logger.error("Failed to start the controller.", e);
}
}
- } catch (ClassNotFoundException e) {
- logger.error("Invalid module class [{}]", args[0], e);
- System.exit(-1);
- }
- App app;
- try {
- app = (App) injector.getInstance(Class.forName(args[1]));
- app.init();
- app.start();
- } catch (ClassNotFoundException e) {
- logger.error("Invalid S4 application class [{}] : {}", args[1], e.getMessage());
+ } catch (Exception e) {
+ logger.error("Cannot start S4 node", e);
}
}
- static void usageForDevelopmentMode(String[] args) {
- logger.info("Invalid parameters "
- + Arrays.toString(args)
- + " \nUsage: java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName>"
- + "\n(this uses default.s4.properties from the classpath)"
- + "\nor:"
- + " java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName> <pathToConfigFile>");
- System.exit(-1);
+ @Parameters(separators = "=")
+ public static class MainArgs {
+
+ @Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
+ String clusterName = null;
+
+ @Parameter(names = "-commModuleClass", description = "configuration module class for the communication layer", required = false)
+ String commModuleClass = DefaultCommModule.class.getName();
+
+ @Parameter(names = "-commConfig", description = "s4 communication layer configuration file", required = false)
+ String commConfigFilePath;
+
+ @Parameter(names = "-coreModuleClass", description = "s4-core configuration module class", required = false)
+ String coreModuleClass = DefaultCoreModule.class.getName();
+
+ @Parameter(names = "-coreConfig", description = "s4 core configuration file", required = false)
+ String coreConfigFilePath = null;
+
+ @Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
+ String appClass = null;
+
+ @Parameter(names = "-extraModulesClasses", description = "additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
+ List<String> extraModulesClasses = new ArrayList<String>();
+
+ @Parameter(names = "-namedStringParameters", description = "Guice @Named parameters, used when starting in non dynamic mode, for instance for the adapter. Syntax: '-namedStringParameters=name1:value1,name2:value2 etc...'", hidden = true)
+ List<String> extraNamedParameters = new ArrayList<String>();
+
+ @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
+ String zkConnectionString = "localhost:2181";
+
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 2571e20..bf5f9eb 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -43,6 +43,7 @@ public class Receiver implements Runnable {
this.serDeser = serDeser;
thread = new Thread(this, "Receiver");
+ // TODO avoid starting the thread here
thread.start();
streams = new MapMaker().makeMap();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index d44d270..b05ca28 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -9,9 +9,9 @@ import org.apache.s4.base.EventMessage;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.StreamConsumer;
-import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.StreamConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,10 +42,11 @@ public class RemoteSenders {
Set<StreamConsumer> consumers = streams.getConsumers(event.getStreamName());
for (StreamConsumer consumer : consumers) {
+ // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
+ // represented by a single stream consumer
RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
if (sender == null) {
- sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())),
- hasher);
+ sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())), hasher);
// TODO cleanup when remote topologies die
sendersByTopology.put(consumer.getClusterName(), sender);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index fdbd800..3a87648 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -96,24 +96,6 @@ public class Server {
// disabled app loading from local files
- // File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
- // for (File s4rFile : s4rFiles) {
- // loadApp(s4rFile);
- // }
-
- /* Now init + start apps. TODO: implement dynamic loading/unloading using ZK. */
- for (Map.Entry<String, App> appEntry : apps.entrySet()) {
- logger.info("Initializing app " + appEntry.getValue().getClass().getName());
- appEntry.getValue().init();
- }
-
- for (Map.Entry<String, App> appEntry : apps.entrySet()) {
- logger.info("Starting app " + appEntry.getKey() + "/" + appEntry.getValue().getClass().getName());
- appEntry.getValue().start();
- }
-
- logger.info("Completed local applications startup.");
-
if (deploymentManager != null) {
deploymentManager.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java
new file mode 100644
index 0000000..c23afb0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java
@@ -0,0 +1,52 @@
+package org.apache.s4.core.adapter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteStream;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Base class for adapters. For now, it provides facilities for automatically creating an output stream.
+ *
+ */
+public abstract class AdapterApp extends App {
+
+ @Inject
+ @Named(value = "adapter.output.stream")
+ String outputStreamName;
+
+ private RemoteStream remoteStream;
+
+ protected KeyFinder<Event> remoteStreamKeyFinder;
+
+ public RemoteStream getRemoteStream() {
+ return remoteStream;
+ }
+
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ remoteStream = createOutputStream(outputStreamName, remoteStreamKeyFinder);
+ }
+
+ /**
+ * This method allows to specify a keyfinder in order to partition the output stream
+ *
+ * @param keyFinder
+ * used for identifying keys from the events
+ */
+ protected void setKeyFinder(KeyFinder<Event> keyFinder) {
+ this.remoteStreamKeyFinder = keyFinder;
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
new file mode 100644
index 0000000..83914bd
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
@@ -0,0 +1,28 @@
+package org.apache.s4.core.util;
+
+import java.util.Map;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+/**
+ * Injects String parameters from a map. Used for loading parameters outside of config files.
+ *
+ */
+public class ParametersInjectionModule extends AbstractModule {
+
+ Map<String, String> params;
+
+ public ParametersInjectionModule(Map<String, String> params) {
+ this.params = params;
+ }
+
+ @Override
+ protected void configure() {
+ for (Map.Entry<String, String> param : params.entrySet()) {
+ bind(String.class).annotatedWith(Names.named(param.getKey())).toInstance(param.getValue());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r b/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r
deleted file mode 100644
index b1d0cac..0000000
Binary files a/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/resources/default.s4.core.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/default.s4.core.properties b/subprojects/s4-core/src/main/resources/default.s4.core.properties
new file mode 100644
index 0000000..a078f75
--- /dev/null
+++ b/subprojects/s4-core/src/main/resources/default.s4.core.properties
@@ -0,0 +1,2 @@
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/resources/s4-core.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/s4-core.properties b/subprojects/s4-core/src/main/resources/s4-core.properties
deleted file mode 100644
index 7db6157..0000000
--- a/subprojects/s4-core/src/main/resources/s4-core.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-comm.module = org.apache.s4.comm.Module
-s4.logger_level = TRACE
-#s4.apps.path = /My/Apps/Dir
-
-
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 7a01992..12f5d2f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.core.triggers.TriggeredApp;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
@@ -44,8 +45,9 @@ public abstract class TriggerTest extends ZkBasedTest {
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
- Injector injector = Guice.createInjector(new DefaultModule(Resources.newInputStreamSupplier(
- Resources.getResource("default.s4.properties")).getInput()));
+ Injector injector = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
app = injector.getInstance(TriggeredApp.class);
app.init();
app.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index f5f4ce4..4eae82b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -15,7 +15,6 @@ import junit.framework.Assert;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -34,7 +33,6 @@ import org.junit.Test;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
-import com.google.common.io.Resources;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@@ -53,7 +51,7 @@ public class TestAutomaticDeployment {
private Factory zookeeperServerConnectionFactory;
private Process forkedNode;
private ZkClient zkClient;
- private String clusterName;
+ private static final String CLUSTER_NAME = "clusterZ";
private HttpServer httpServer;
private static File tmpAppsDir;
@@ -72,43 +70,23 @@ public class TestAutomaticDeployment {
+ tmpAppsDir.getAbsolutePath() });
}
- @Before
- public void cleanLocalAppsDir() throws ConfigurationException, IOException {
- PropertiesConfiguration config = loadConfig();
-
- if (!new File(config.getString("appsDir")).exists()) {
- Assert.assertTrue(new File(config.getString("appsDir")).mkdirs());
- } else {
- if (!config.getString("appsDir").startsWith("/tmp")) {
- Assert.fail("apps dir should a subdir of /tmp for safety");
- }
- CommTestUtils.deleteDirectoryContents(new File(config.getString("appsDir")));
- }
- }
-
- private PropertiesConfiguration loadConfig() throws ConfigurationException, IOException {
- PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
- return config;
- }
-
// ignore this test since now we only deploy from artifacts published through zookeeper
@Test
@Ignore
public void testInitialDeploymentFromFileSystem() throws Exception {
- File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
- + System.currentTimeMillis() + ".s4r");
-
- Assert.assertTrue(ByteStreams.copy(
- Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
- + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
-
- initializeS4Node();
-
- final String uri = s4rToDeploy.toURI().toString();
-
- assertDeployment(uri, true);
+ // File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
+ // + System.currentTimeMillis() + ".s4r");
+ //
+ // Assert.assertTrue(ByteStreams.copy(
+ // Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ // + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+ //
+ // initializeS4Node();
+ //
+ // final String uri = s4rToDeploy.toURI().toString();
+ //
+ // assertDeployment(uri, true);
}
@@ -143,7 +121,7 @@ public class TestAutomaticDeployment {
if (!initial) {
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp", record, CreateMode.PERSISTENT);
}
Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
@@ -181,11 +159,11 @@ public class TestAutomaticDeployment {
ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
- zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp1", record1, CreateMode.PERSISTENT);
ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
- zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp2", record2, CreateMode.PERSISTENT);
Assert.assertTrue(signalApp1Initialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
@@ -282,21 +260,17 @@ public class TestAutomaticDeployment {
// current package .
// 1. start s4 nodes. Check that no app is deployed.
- PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
-
- clusterName = config.getString("cluster.name");
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1, 1300);
+ taskSetup.clean(CLUSTER_NAME);
+ taskSetup.setup(CLUSTER_NAME, 1, 1300);
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
- List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
+ List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
- zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
+ zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -307,10 +281,7 @@ public class TestAutomaticDeployment {
}
});
- File tmpConfig = File.createTempFile("tmp", "config");
- Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
- Files.newOutputStreamSupplier(tmpConfig)) > 0);
- forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+ forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
// TODO synchro with ready state from zk
Thread.sleep(10000);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index ca42d9d..5062660 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -2,7 +2,6 @@ package org.apache.s4.deploy.prodcon;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -36,7 +35,7 @@ public class TestProducerConsumer {
private Factory zookeeperServerConnectionFactory;
private Process forkedNode;
private ZkClient zkClient;
- private String clusterName;
+ private final static String CLUSTER_NAME = "prodconcluster";
private HttpServer httpServer;
private static File tmpAppsDir;
@@ -92,7 +91,7 @@ public class TestProducerConsumer {
private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException,
IOException {
PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
+ config.load(Resources.getResource("default.s4.core.properties").openStream());
return config;
}
@@ -118,11 +117,11 @@ public class TestProducerConsumer {
ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
- zkClient.create("/s4/clusters/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/showtime", record1, CreateMode.PERSISTENT);
ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
- zkClient.create("/s4/clusters/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/counter", record2, CreateMode.PERSISTENT);
// TODO validate test through some Zookeeper notifications
Thread.sleep(10000);
@@ -134,25 +133,17 @@ public class TestProducerConsumer {
// current package .
// 1. start s4 nodes. Check that no app is deployed.
- // PropertiesConfiguration config = new PropertiesConfiguration();
- // config.load(Resources.newInputStreamSupplier(Resources.getResource("/org.apache.s4.deploy.s4.properties"))
- // .getInput());
- InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
- PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(is);
-
- clusterName = config.getString("cluster.name");
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
taskSetup.clean("s4");
- taskSetup.setup(clusterName, 1, 1300);
+ taskSetup.setup(CLUSTER_NAME, 1, 1300);
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
- List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
+ List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
- zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
+ zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -163,10 +154,7 @@ public class TestProducerConsumer {
}
});
- File tmpConfig = File.createTempFile("tmp", "config");
- Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
- Files.newOutputStreamSupplier(tmpConfig)) > 0);
- forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+ forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
// TODO synchro with ready state from zk
Thread.sleep(10000);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
deleted file mode 100644
index 18aaf9c..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.core.RemoteSenders;
-
-public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
- private final Class<?> appClass;
-
- private Class<?> findAppClass() {
- // infer actual app class through "super type tokens" (this simple code
- // assumes actual module class is a direct subclass from this one)
- ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
- Type[] fieldArgTypes = pt.getActualTypeArguments();
- return (Class<?>) fieldArgTypes[0];
- }
-
- protected ZkBasedAppModule() {
- super();
- this.appClass = findAppClass();
- }
-
- protected ZkBasedAppModule(Class<? extends Emitter> emitterClass,
- Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
- super(emitterClass, remoteEmitterClass, listenerClass);
- this.appClass = findAppClass();
- }
-
- @Override
- protected void configure() {
- super.configure();
- bind(appClass);
- bind(RemoteSenders.class);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
index e493385..9907bee 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
@@ -1,7 +1,13 @@
package org.apache.s4.wordcount;
-import org.apache.s4.fixtures.ZkBasedAppModule;
+import com.google.inject.AbstractModule;
-public class WordCountModule extends ZkBasedAppModule<WordCountApp> {
+public class WordCountModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(WordCountApp.class);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 14c7abd..2365a48 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -6,9 +6,8 @@ import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -19,8 +18,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.io.Resources;
-
public class WordCountTest {
public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
@@ -58,12 +55,12 @@ public class WordCountTest {
public void testSimple() throws Exception {
final ZooKeeper zk = CommTestUtils.createZkClient();
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
- PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
+ String clusterName = "clusterA";
taskSetup.clean("s4");
- taskSetup.setup(config.getString("cluster.name"), 1, 10000);
+ taskSetup.setup(clusterName, 1, 10000);
- App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
+ Main.main(new String[] { "-cluster=" + clusterName, "-appClass=" + WordCountApp.class.getName(),
+ "-extraModulesClasses=" + WordCountModule.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
deleted file mode 100644
index 488bd21..0000000
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.name = cluster1
-cluster.zk_address = localhost:2181
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
-s4.logger_level = DEBUG
-appsDir=/tmp/deploy-test
-tcp.partition.queue_size=1000
-comm.timeout=100
-comm.retry_delay=100
-comm.retries=10
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
index c50e44d..067c551 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
@@ -1,34 +1,32 @@
/*
* Copyright (c) 2011 The S4 Project, http://s4.io.
* All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.example.model;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
public class Main {
-
+
/**
* @param args
*/
public static void main(String[] args) {
- Injector injector = Guice.createInjector(new Module(), new org.apache.s4.comm.Module());
-
- Controller controller = injector.getInstance(Controller.class);
- controller.start();
+ // Injector injector = Guice.createInjector(new DefaultModule());
+ //
+ // Controller controller = injector.getInstance(Controller.class);
+ // controller.start();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/s4-tools.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
index 406d670..e8edd14 100644
--- a/subprojects/s4-tools/s4-tools.gradle
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -40,6 +40,13 @@ dependencies {
apply plugin:'application'
mainClassName = "org.apache.s4.tools.Tools"
+task copyTemplates << {
+ copy {
+ from 'src/main/java/hello'
+ into 'src/main/resources'
+ }
+}
+
applicationDistribution.from(configurations.compile) {
// this is supposed to be done by default but is not anymore in 1.0-rc-3
include "*.jar"
@@ -52,6 +59,7 @@ startScripts {
+
run {
// run doesn't yet directly accept command line parameters...
if ( project.hasProperty('args') ) {