You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[8/22] git commit: improved scripts + added adapter facilities - added a simple runnable project template - added a facility to start an adapter node directly from the current project - added some documentation - improved error handling in tcp emitter

improved scripts + added adapter facilities
- added a simple runnable project template
- added a facility to start an adapter node directly from the current
project
- added some documentation
- improved error handling in tcp emitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cad14b40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cad14b40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cad14b40

Branch: refs/heads/piper
Commit: cad14b4035c3a076c614e1f995d2d352ffcf0569
Parents: 99b6f04
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu May 24 20:10:00 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu May 24 20:10:00 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |    2 +-
 s4                                                 |   36 +--
 .../java/org/apache/s4/comm/DefaultCommModule.java |  129 +++++++++
 .../org/apache/s4/comm/RemoteEmitterFactory.java   |   10 +
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   32 ++-
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |    4 +
 .../org/apache/s4/comm/topology/RemoteStreams.java |    8 +
 .../apache/s4/comm/topology/StreamConsumer.java    |   28 ++
 .../org/apache/s4/comm/udp/UDPRemoteEmitter.java   |   18 ++
 .../src/main/resources/default.s4.comm.properties  |    8 +
 .../s4-comm/src/main/resources/s4-comm.properties  |    9 -
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |   14 +-
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |   12 +-
 .../ZkBasedClusterManagementTestModule.java        |  101 -------
 .../src/test/resources/default.s4.properties       |    6 -
 .../src/test/resources/udp.s4.comm.properties      |    9 +
 .../src/main/java/org/apache/s4/core/App.java      |   49 +----
 .../java/org/apache/s4/core/DefaultCoreModule.java |   76 ++++++
 .../java/org/apache/s4/core/DefaultModule.java     |   99 -------
 .../src/main/java/org/apache/s4/core/Main.java     |  210 ++++++++-------
 .../src/main/java/org/apache/s4/core/Receiver.java |    1 +
 .../java/org/apache/s4/core/RemoteSenders.java     |    9 +-
 .../src/main/java/org/apache/s4/core/Server.java   |   18 --
 .../org/apache/s4/core/adapter/AdapterApp.java     |   52 ++++
 .../s4/core/util/ParametersInjectionModule.java    |   28 ++
 .../src/main/resources/apps/CounterExample.s4r     |  Bin 53071 -> 0 bytes
 .../src/main/resources/default.s4.core.properties  |    2 +
 .../s4-core/src/main/resources/s4-core.properties  |    5 -
 .../test/java/org/apache/s4/core/TriggerTest.java  |    6 +-
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   71 ++----
 .../s4/deploy/prodcon/TestProducerConsumer.java    |   28 +--
 .../org/apache/s4/fixtures/ZkBasedAppModule.java   |   40 ---
 .../org/apache/s4/wordcount/WordCountModule.java   |   10 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   13 +-
 .../src/test/resources/default.s4.properties       |   12 -
 .../java/org/apache/s4/example/model/Main.java     |   18 +-
 subprojects/s4-tools/s4-tools.gradle               |    8 +
 .../main/java/org/apache/s4/tools/CreateApp.java   |  139 ++++++++++
 .../java/org/apache/s4/tools/DefineCluster.java    |    4 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |   17 +-
 .../main/java/org/apache/s4/tools/S4ArgsBase.java  |    3 +
 .../src/main/java/org/apache/s4/tools/Tools.java   |   86 +++++-
 .../src/main/resources/templates/HelloApp.java.txt |   34 +++
 .../resources/templates/HelloInputAdapter.java.txt |   60 ++++
 .../src/main/resources/templates/HelloPE.java.txt  |   29 ++
 .../src/main/resources/templates/build.gradle      |  168 ++++++++++++
 .../s4-tools/src/main/resources/templates/gradlew  |    2 +
 .../src/main/resources/templates/newApp.README     |   37 +++
 .../s4-tools/src/main/resources/templates/s4       |   17 ++
 test-apps/twitter-adapter/build.gradle             |    5 +
 .../s4/example/twitter/TwitterInputAdapter.java    |   15 +-
 test-apps/twitter-counter/build.gradle             |    6 +-
 .../org/apache/s4/example/twitter/TopNTopicPE.java |   23 ++-
 53 files changed, 1193 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 58ff560..b6140ed 100644
--- a/build.gradle
+++ b/build.gradle
@@ -72,7 +72,7 @@ libraries = [
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
-    jcommander:         'com.beust:jcommander:1.23',
+    jcommander:         'com.beust:jcommander:1.25',
     commons_io:         'commons-io:commons-io:2.1',
     gradle_base_services: 'gradle-base-services:gradle-base-services:1.0-rc-3',
     gradle_core: 'gradle-core:gradle-core:1.0-rc-3',

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/s4
----------------------------------------------------------------------
diff --git a/s4 b/s4
index 56745a2..61517a1 100755
--- a/s4
+++ b/s4
@@ -4,29 +4,10 @@
 
 GRADLE=`pwd`/gradlew
 
-case "$1" in
-"deploy")
-	shift
-    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.Deploy -gradle=$GRADLE $@
-;;
-"zkServer")
-	shift
-    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.ZKServer $@
-;;
-"newCluster")
-	shift
-    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.DefineCluster $@
-;;
-"appNode")
-	shift
-  	subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.Main $@
-;;
-"adapterNode")
-	shift
-  	subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.adapter.AdapterMain $@
-;;
-
-esac
+S4_DIR="$( cd -P "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+S4_SCRIPT_PATH="$S4_DIR/s4"
+
+subprojects/s4-tools/build/install/s4-tools/bin/s4-tools -s4ScriptPath=$S4_SCRIPT_PATH $@
 
 # EXAMPLES
 # deploy:
@@ -35,11 +16,8 @@ esac
 
 # newCluster:
 # # examples:
-#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000 -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=13000 -nbTasks=1
-
-# appNode:
-# ./s4 appNode subprojects/s4-core/src/test/resources/default.s4.properties
+#./s4 newCluster -c=s4-test-cluster -flp=11000 -nbTasks=2 ; ./s4 newCluster -c=s4-adapter-cluster -flp=13000 -nbTasks=1
 
-# adapterNode:
-# ./s4 adapterNode -s4Properties=test-apps/twitter-adapter/src/main/resources/s4.properties
+# node:
+# ./s4 node -c=s4-test-cluster
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
new file mode 100644
index 0000000..df38071
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -0,0 +1,129 @@
+package org.apache.s4.comm;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+
+/**
+ * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ * 
+ */
+public class DefaultCommModule extends AbstractModule {
+
+    private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
+    InputStream commConfigInputStream;
+    private PropertiesConfiguration config;
+    String clusterName;
+
+    /**
+     * 
+     * @param commConfigInputStream
+     *            input stream from a configuration file
+     * @param clusterName
+     *            the name of the cluster to which the current node belongs. If specified in the configuration file,
+     *            this parameter will be ignored.
+     */
+    public DefaultCommModule(InputStream commConfigInputStream, String clusterName) {
+        super();
+        this.commConfigInputStream = commConfigInputStream;
+        this.clusterName = clusterName;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (commConfigInputStream != null) {
+            try {
+                commConfigInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+        bind(Assignment.class).to(AssignmentFromZK.class);
+        bind(Clusters.class).to(ClustersFromZK.class);
+        bind(Cluster.class).to(ClusterFromZK.class);
+
+        try {
+            Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
+                    .getString("comm.emitter.class"));
+            bind(Emitter.class).to(emitterClass);
+
+            // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+            Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
+                    .getString("comm.emitter.remote.class"));
+            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+                    RemoteEmitterFactory.class));
+            bind(RemoteEmitters.class);
+
+            bind(Listener.class).to((Class<? extends Listener>) Class.forName(config.getString("comm.listener.class")));
+
+        } catch (ClassNotFoundException e) {
+            logger.error("Cannot find class implementation ", e);
+        }
+
+    }
+
+    @SuppressWarnings("serial")
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(commConfigInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+            if (clusterName != null) {
+                if (config.containsKey("cluster.name")) {
+                    logger.warn(
+                            "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
+                            clusterName, config.getProperty("cluster.name"));
+                } else {
+                    Names.bindProperties(binder, new HashMap<String, String>() {
+                        {
+                            put("cluster.name", clusterName);
+                        }
+                    });
+                }
+            }
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
new file mode 100644
index 0000000..3da803d
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
@@ -0,0 +1,10 @@
+package org.apache.s4.comm;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.topology.Cluster;
+
+public interface RemoteEmitterFactory {
+
+    RemoteEmitter createRemoteEmitter(Cluster topology);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 623172a..7bff4af 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -1,6 +1,5 @@
 package org.apache.s4.comm.tcp;
 
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.Hashtable;
@@ -10,9 +9,9 @@ import java.util.concurrent.Executors;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
+import org.apache.s4.comm.topology.ClusterNode;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -198,6 +197,8 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChange
         // the handler is correctly responding to disconnections/reconnections 3/ there should be a lock per channel,
         // no?
 
+        // NOTE: might be fixed in S4-7
+
         // */
         // if (!channel.isWritable()) {
         // synchronized (sendLock) {
@@ -241,7 +242,7 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChange
         }
 
         if (f.isCancelled()) {
-            logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
+            logger.error("Send I/O was cancelled! " + f.getChannel().getRemoteAddress());
         } else if (!f.isSuccess()) {
             logger.error("Exception on I/O operation", f.getCause());
             logger.error(String.format("I/O on partition %d failed!", partitionId));
@@ -291,22 +292,23 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChange
         @Override
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
             Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
+            String target;
             if (partitionId == null) {
-                logger.error("Error on mystery channel!!");
+                target = "unknown channel";
+            } else {
+                target = "channel for partition [" + partitionId + "], target node host ["
+                        + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
+                        + partitionNodeMap.get(partitionId).getPort() + "]";
             }
-            logger.error("Error on channel to partition " + partitionId);
+            logger.error(
+                    "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
+                    target);
 
-            try {
-                throw event.getCause();
-            } catch (ConnectException ce) {
-                logger.error(ce.getMessage(), ce);
-            } catch (Throwable err) {
-                logger.error("Error", err);
-                if (context.getChannel().isOpen()) {
-                    logger.error("Closing channel due to exception");
-                    context.getChannel().close();
-                }
+            if (context.getChannel().isOpen()) {
+                logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
+                context.getChannel().close();
             }
+
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 0458c70..aadcd4d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -48,4 +48,8 @@ public class TaskSetup {
         }
     }
 
+    public void disconnect() {
+        zkclient.close();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 6b5d094..dd93682 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -188,6 +188,13 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener {
         zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
     }
 
+    /**
+     * Publishes interest in a stream from an application.
+     * 
+     * @param appId
+     * @param clusterName
+     * @param streamName
+     */
     public void addInputStream(int appId, String clusterName, String streamName) {
         lock.lock();
         try {
@@ -198,6 +205,7 @@ public class RemoteStreams implements IZkStateListener, IZkChildListener {
             consumer.putSimpleField("appId", String.valueOf(appId));
             consumer.putSimpleField("clusterName", clusterName);
             try {
+                // NOTE: We create 1 sequential znode per consumer node instance
                 zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
             } catch (Throwable e) {
                 logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
index f21213a..ede16a3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -24,4 +24,32 @@ public class StreamConsumer {
         return clusterName;
     }
 
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + appId;
+        result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        StreamConsumer other = (StreamConsumer) obj;
+        if (appId != other.appId)
+            return false;
+        if (clusterName == null) {
+            if (other.clusterName != null)
+                return false;
+        } else if (!clusterName.equals(other.clusterName))
+            return false;
+        return true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
new file mode 100644
index 0000000..4fa9aa0
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -0,0 +1,18 @@
+package org.apache.s4.comm.udp;
+
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class UDPRemoteEmitter extends UDPEmitter {
+
+    /**
+     * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
+     * discovered (as remote streams outputs)
+     */
+    @Inject
+    public UDPRemoteEmitter(@Assisted Cluster topology) throws InterruptedException {
+        super(topology);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
new file mode 100644
index 0000000..07497b2
--- /dev/null
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -0,0 +1,8 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
+comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
+comm.listener.class=org.apache.s4.comm.tcp.TCPListener
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/main/resources/s4-comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/s4-comm.properties b/subprojects/s4-comm/src/main/resources/s4-comm.properties
deleted file mode 100644
index aa0c843..0000000
--- a/subprojects/s4-comm/src/main/resources/s4-comm.properties
+++ /dev/null
@@ -1,9 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = /tmp
-cluster.isCluster = true
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index cb39c1b..268715b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -2,35 +2,39 @@ package org.apache.s4.comm.tcp;
 
 import java.io.IOException;
 
+import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.name.Names;
 
 public class TCPCommTest extends ZkBasedTest {
     DeliveryTestUtil util;
+    public final static String CLUSTER_NAME = "cluster1";
 
     @Before
     public void setup() throws IOException, InterruptedException, KeeperException {
-        Injector injector = Guice.createInjector(new TCPCommTestModule());
+        Injector injector = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
+                new TCPCommTestModule());
         util = injector.getInstance(DeliveryTestUtil.class);
     }
 
-    class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
+    class TCPCommTestModule extends AbstractModule {
         TCPCommTestModule() {
-            super(TCPEmitter.class, TCPRemoteEmitter.class, TCPListener.class);
+
         }
 
         @Override
         protected void configure() {
-            super.configure();
             bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
             bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
             bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index e224b99..8f01a9b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -2,14 +2,17 @@ package org.apache.s4.comm.udp;
 
 import java.io.IOException;
 
+import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.comm.tcp.TCPCommTest;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.name.Names;
@@ -19,18 +22,17 @@ public class UDPCommTest extends ZkBasedTest {
 
     @Before
     public void setup() throws IOException, InterruptedException, KeeperException {
-        Injector injector = Guice.createInjector(new UDPCommTestModule());
+        Injector injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
+                .openStream(), TCPCommTest.CLUSTER_NAME), new UDPCommTestModule());
         util = injector.getInstance(DeliveryTestUtil.class);
     }
 
-    class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
+    class UDPCommTestModule extends AbstractModule {
         UDPCommTestModule() {
-            super(UDPEmitter.class, null, UDPListener.class);
         }
 
         @Override
         protected void configure() {
-            super.configure();
             bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
             bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
             bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
deleted file mode 100644
index a1e7089..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.RemoteEmitterFactory;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.tcp.TCPRemoteEmitter;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromZK;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.ClustersFromZK;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-public class ZkBasedClusterManagementTestModule extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-
-    private Class<? extends Emitter> emitterClass = null;
-    private Class<? extends RemoteEmitter> remoteEmitterClass = null;
-    private Class<? extends Listener> listenerClass = null;
-
-    protected ZkBasedClusterManagementTestModule() {
-    }
-
-    protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
-            Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
-        this.emitterClass = emitterClass;
-        this.remoteEmitterClass = remoteEmitterClass;
-        this.listenerClass = listenerClass;
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-        bind(Assignment.class).to(AssignmentFromZK.class);
-        bind(Clusters.class).to(ClustersFromZK.class);
-        bind(Cluster.class).to(ClusterFromZK.class);
-
-        // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
-        if (this.remoteEmitterClass != null) {
-            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
-                    RemoteEmitterFactory.class));
-        } else {
-            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
-                    RemoteEmitterFactory.class));
-        }
-
-        bind(RemoteEmitters.class);
-
-        if (this.emitterClass != null) {
-            bind(Emitter.class).to(this.emitterClass);
-        } else {
-            bind(Emitter.class).to(TCPEmitter.class);
-        }
-
-        if (this.listenerClass != null) {
-            bind(Listener.class).to(this.listenerClass);
-        } else {
-            bind(Listener.class).to(TCPListener.class);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/default.s4.properties b/subprojects/s4-comm/src/test/resources/default.s4.properties
deleted file mode 100644
index 6624ff4..0000000
--- a/subprojects/s4-comm/src/test/resources/default.s4.properties
+++ /dev/null
@@ -1,6 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.name = cluster1
-cluster.zk_address = localhost:2181
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
new file mode 100644
index 0000000..615f114
--- /dev/null
+++ b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
@@ -0,0 +1,9 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+comm.emitter.class = org.apache.s4.comm.udp.UDPEmitter
+comm.emitter.remote.class = org.apache.s4.comm.udp.UDPRemoteEmitter
+comm.listener.class = org.apache.s4.comm.udp.UDPListener
+cluster.name = cluster1
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index c6bffd3..935d4a5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -16,7 +16,6 @@
 package org.apache.s4.core;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -26,13 +25,11 @@ import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.App.ClockType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.name.Named;
 
 /*
@@ -384,48 +381,4 @@ public abstract class App {
         return stream != null ? stream.getName() + " " : "null ";
     }
 
-    /**
-     * Facility for starting S4 apps by passing a module class and an application class
-     * 
-     * Usage: java &ltclasspath+params&gt org.apache.s4.core.App &ltappClassName&gt &ltmoduleClassName&gt
-     * 
-     */
-    public static void main(String[] args) {
-        if (args.length != 2) {
-            usage(args);
-        }
-        logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
-        Injector injector = null;
-        try {
-            if (!AbstractModule.class.isAssignableFrom(Class.forName(args[0]))) {
-                logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
-                System.exit(-1);
-            }
-            injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).newInstance());
-        } catch (InstantiationException e) {
-            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-            System.exit(-1);
-        } catch (IllegalAccessException e) {
-            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-            System.exit(-1);
-        } catch (ClassNotFoundException e) {
-            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-            System.exit(-1);
-        }
-        App app;
-        try {
-            app = (App) injector.getInstance(Class.forName(args[1]));
-            app.init();
-            app.start();
-        } catch (ClassNotFoundException e) {
-            logger.error("Invalid S4 application class [{}] : {}", args[0], e.getMessage());
-        }
-    }
-
-    private static void usage(String[] args) {
-        logger.info("Invalid parameters " + Arrays.toString(args)
-                + " \nUsage: java <classpath+params> org.apache.s4.core.App <appClassName> <moduleClassName>");
-        System.exit(-1);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
new file mode 100644
index 0000000..4baf10f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -0,0 +1,76 @@
+package org.apache.s4.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ * 
+ */
+public class DefaultCoreModule extends AbstractModule {
+
+    private static Logger logger = LoggerFactory.getLogger(DefaultCoreModule.class);
+
+    InputStream coreConfigFileInputStream;
+    private PropertiesConfiguration config;
+
+    String clusterName = null;
+
+    public DefaultCoreModule(InputStream coreConfigFileInputStream) {
+        this.coreConfigFileInputStream = coreConfigFileInputStream;
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (coreConfigFileInputStream != null) {
+            try {
+                coreConfigFileInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+    }
+
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(coreConfigFileInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
deleted file mode 100644
index 9003f26..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.s4.core;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.RemoteEmitterFactory;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.tcp.TCPRemoteEmitter;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromZK;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.ClustersFromZK;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-/**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
- * 
- */
-public class DefaultModule extends AbstractModule {
-
-    InputStream configFileInputStream;
-    private PropertiesConfiguration config;
-
-    public DefaultModule(InputStream configFileInputStream) {
-        this.configFileInputStream = configFileInputStream;
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (configFileInputStream != null) {
-            try {
-                configFileInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        bind(Emitter.class).to(TCPEmitter.class);
-
-        bind(Listener.class).to(TCPListener.class);
-
-        bind(Assignment.class).to(AssignmentFromZK.class);
-
-        bind(Clusters.class).to(ClustersFromZK.class);
-        bind(Cluster.class).to(ClusterFromZK.class);
-
-        // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
-        install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
-                RemoteEmitterFactory.class));
-        bind(RemoteEmitters.class);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
-    }
-
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(configFileInputStream);
-
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 09a3778..663caed 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -2,14 +2,21 @@ package org.apache.s4.core;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.InputStream;
-import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.util.ParametersInjectionModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
 import com.google.common.io.Resources;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -29,121 +36,126 @@ public class Main {
      * @param args
      */
     public static void main(String[] args) {
-        try {
-            if (args.length == 0) {
-                logger.info("Starting S4 node with default configuration");
-                startDefaultS4Node();
-            } else if (args.length == 1) {
-                logger.info("Starting S4 node with custom configuration from file {}", args[0]);
-                startCustomS4Node(args[0]);
-            } else {
-                logger.info("Starting S4 node in development mode");
-                startDevelopmentMode(args);
-            }
-        } catch (Exception e) {
-            logger.error("Cannot start S4 node", e);
-        }
-    }
 
-    private static void startCustomS4Node(String s4PropertiesFilePath) throws FileNotFoundException {
-        // TODO that's quite inconvenient anyway: we still need to specify the comm module in the config
-        // file passed as a parameter...
-        Injector injector = Guice
-                .createInjector(new DefaultModule(new FileInputStream(new File(s4PropertiesFilePath))));
-        startServer(logger, injector);
-    }
-
-    private static void startDefaultS4Node() {
-        final Logger logger = LoggerFactory.getLogger(Main.class);
-
-        /*
-         * Need to get name of plugin module. Load ControllerModule to get configuration.
-         */
-        Injector injector = Guice.createInjector(new org.apache.s4.core.Module());
+        MainArgs mainArgs = new MainArgs();
+        JCommander jc = new JCommander(mainArgs);
 
-        startServer(logger, injector);
-    }
-
-    private static void startServer(final Logger logger, Injector injector) {
-        Server server = injector.getInstance(Server.class);
         try {
-            server.start(injector);
+            jc.parse(args);
         } catch (Exception e) {
-            logger.error("Failed to start the controller.", e);
+            JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+            jc.usage();
+            System.exit(1);
         }
+
+        startNode(mainArgs);
     }
 
-    /**
-     * Facility for starting S4 apps by passing a module class and an application class
-     * 
-     * Usage: java &ltclasspath+params&gt org.apache.s4.core.Main &ltappClassName&gt &ltmoduleClassName&gt
-     * 
-     */
-    private static void startDevelopmentMode(String[] args) {
-        if (args.length < 2 && args.length > 3) {
-            usageForDevelopmentMode(args);
-        }
-        logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
-        Injector injector = null;
+    private static void startNode(MainArgs mainArgs) {
         try {
-            if (!AbstractModule.class.isAssignableFrom(Class.forName(args[0]))) {
-                logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
-                System.exit(-1);
+            Injector injector;
+            InputStream commConfigFileInputStream;
+            InputStream coreConfigFileInputStream;
+            String commConfigString;
+            if (mainArgs.commConfigFilePath == null) {
+                commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+                commConfigString = "default.s4.comm.properties from classpath";
+            } else {
+                commConfigFileInputStream = new FileInputStream(new File(mainArgs.commConfigFilePath));
+                commConfigString = mainArgs.commConfigFilePath;
             }
-            if (args.length == 3) {
-                if (!(new File(args[2]).exists())) {
-                    logger.error("Cannot find S4 config file {}", args[2]);
-                    System.exit(-1);
-                }
-                try {
-                    injector = Guice.createInjector((AbstractModule) Class.forName(args[0])
-                            .getConstructor(InputStream.class).newInstance(new FileInputStream(new File(args[2]))));
-                } catch (Exception e) {
-                    logger.error("Module loading error", e);
-                    System.exit(-1);
-                }
+
+            String coreConfigString;
+            if (mainArgs.coreConfigFilePath == null) {
+                coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+                coreConfigString = "default.s4.core.properties from classpath";
             } else {
-                URL defaultS4Config = null;
-                try {
-                    defaultS4Config = Resources.getResource("default.s4.properties");
-                } catch (IllegalArgumentException e) {
-                    logger.error(
-                            "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
-                            e);
-                    System.exit(-1);
+                coreConfigFileInputStream = new FileInputStream(new File(mainArgs.coreConfigFilePath));
+                coreConfigString = mainArgs.coreConfigFilePath;
+            }
+
+            AbstractModule commModule = (AbstractModule) Class.forName(mainArgs.commModuleClass)
+                    .getConstructor(InputStream.class, String.class)
+                    .newInstance(commConfigFileInputStream, mainArgs.clusterName);
+            AbstractModule coreModule = (AbstractModule) Class.forName(mainArgs.coreModuleClass)
+                    .getConstructor(InputStream.class).newInstance(coreConfigFileInputStream);
+
+            List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+            for (String moduleClass : mainArgs.extraModulesClasses) {
+                extraModules.add((com.google.inject.Module) Class.forName(moduleClass).newInstance());
+            }
+
+            List<com.google.inject.Module> modules = new ArrayList<com.google.inject.Module>();
+            modules.add((com.google.inject.Module) commModule);
+            modules.add((com.google.inject.Module) coreModule);
+            modules.addAll(extraModules);
+
+            logger.info(
+                    "Initializing S4 node with : \n- comm module class [{}]\n- comm configuration file [{}]\n- core module class [{}]\n- core configuration file[{}]\n-extra modules: "
+                            + Arrays.toString(mainArgs.extraModulesClasses.toArray(new String[] {})), new String[] {
+                            mainArgs.commModuleClass, commConfigString, mainArgs.coreModuleClass, coreConfigString });
+
+            if (!mainArgs.extraNamedParameters.isEmpty()) {
+                logger.debug("Adding named parameters for injection : {}",
+                        Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})));
+                Map<String, String> namedParameters = new HashMap<String, String>();
+                for (String namedParam : mainArgs.extraNamedParameters) {
+                    namedParameters.put(namedParam.split("[:]")[0].trim(), namedParam.split("[:]")[1].trim());
                 }
+                modules.add(new ParametersInjectionModule(namedParameters));
+            }
+
+            injector = Guice.createInjector(modules);
 
+            if (mainArgs.appClass != null) {
+                logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
+                App app = (App) injector.getInstance(Class.forName(mainArgs.appClass));
+                app.init();
+                app.start();
+            } else {
+                logger.info("Starting S4 node. This node will automatically download applications published for the cluster it belongs to");
+                Server server = injector.getInstance(Server.class);
                 try {
-                    injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).getConstructor(File.class)
-                            .newInstance(Resources.newInputStreamSupplier(defaultS4Config).getInput()));
+                    server.start(injector);
                 } catch (Exception e) {
-                    logger.error(
-                            "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
-                            e);
-                    System.exit(-1);
+                    logger.error("Failed to start the controller.", e);
                 }
             }
-        } catch (ClassNotFoundException e) {
-            logger.error("Invalid module class [{}]", args[0], e);
-            System.exit(-1);
-        }
-        App app;
-        try {
-            app = (App) injector.getInstance(Class.forName(args[1]));
-            app.init();
-            app.start();
-        } catch (ClassNotFoundException e) {
-            logger.error("Invalid S4 application class [{}] : {}", args[1], e.getMessage());
+        } catch (Exception e) {
+            logger.error("Cannot start S4 node", e);
         }
     }
 
-    static void usageForDevelopmentMode(String[] args) {
-        logger.info("Invalid parameters "
-                + Arrays.toString(args)
-                + " \nUsage: java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName>"
-                + "\n(this uses default.s4.properties from the classpath)"
-                + "\nor:"
-                + " java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName> <pathToConfigFile>");
-        System.exit(-1);
+    @Parameters(separators = "=")
+    public static class MainArgs {
+
+        @Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
+        String clusterName = null;
+
+        @Parameter(names = "-commModuleClass", description = "configuration module class for the communication layer", required = false)
+        String commModuleClass = DefaultCommModule.class.getName();
+
+        @Parameter(names = "-commConfig", description = "s4 communication layer configuration file", required = false)
+        String commConfigFilePath;
+
+        @Parameter(names = "-coreModuleClass", description = "s4-core configuration module class", required = false)
+        String coreModuleClass = DefaultCoreModule.class.getName();
+
+        @Parameter(names = "-coreConfig", description = "s4 core configuration file", required = false)
+        String coreConfigFilePath = null;
+
+        @Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
+        String appClass = null;
+
+        @Parameter(names = "-extraModulesClasses", description = "additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
+        List<String> extraModulesClasses = new ArrayList<String>();
+
+        @Parameter(names = "-namedStringParameters", description = "Guice @Named parameters, used when starting in non dynamic mode, for instance for the adapter. Syntax: '-namedStringParameters=name1:value1,name2:value2 etc...'", hidden = true)
+        List<String> extraNamedParameters = new ArrayList<String>();
+
+        @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
+        String zkConnectionString = "localhost:2181";
+
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 2571e20..bf5f9eb 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -43,6 +43,7 @@ public class Receiver implements Runnable {
         this.serDeser = serDeser;
 
         thread = new Thread(this, "Receiver");
+        // TODO avoid starting the thread here
         thread.start();
 
         streams = new MapMaker().makeMap();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index d44d270..b05ca28 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -9,9 +9,9 @@ import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.StreamConsumer;
-import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.StreamConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,10 +42,11 @@ public class RemoteSenders {
 
         Set<StreamConsumer> consumers = streams.getConsumers(event.getStreamName());
         for (StreamConsumer consumer : consumers) {
+            // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
+            // represented by a single stream consumer
             RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
             if (sender == null) {
-                sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())),
-                        hasher);
+                sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())), hasher);
                 // TODO cleanup when remote topologies die
                 sendersByTopology.put(consumer.getClusterName(), sender);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index fdbd800..3a87648 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -96,24 +96,6 @@ public class Server {
 
         // disabled app loading from local files
 
-        // File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
-        // for (File s4rFile : s4rFiles) {
-        // loadApp(s4rFile);
-        // }
-
-        /* Now init + start apps. TODO: implement dynamic loading/unloading using ZK. */
-        for (Map.Entry<String, App> appEntry : apps.entrySet()) {
-            logger.info("Initializing app " + appEntry.getValue().getClass().getName());
-            appEntry.getValue().init();
-        }
-
-        for (Map.Entry<String, App> appEntry : apps.entrySet()) {
-            logger.info("Starting app " + appEntry.getKey() + "/" + appEntry.getValue().getClass().getName());
-            appEntry.getValue().start();
-        }
-
-        logger.info("Completed local applications startup.");
-
         if (deploymentManager != null) {
             deploymentManager.start();
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java
new file mode 100644
index 0000000..c23afb0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterApp.java
@@ -0,0 +1,52 @@
+package org.apache.s4.core.adapter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteStream;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Base class for adapters. For now, it provides facilities for automatically creating an output stream.
+ * 
+ */
+public abstract class AdapterApp extends App {
+
+    @Inject
+    @Named(value = "adapter.output.stream")
+    String outputStreamName;
+
+    private RemoteStream remoteStream;
+
+    protected KeyFinder<Event> remoteStreamKeyFinder;
+
+    public RemoteStream getRemoteStream() {
+        return remoteStream;
+    }
+
+    @Override
+    protected void onStart() {
+    }
+
+    @Override
+    protected void onInit() {
+        remoteStream = createOutputStream(outputStreamName, remoteStreamKeyFinder);
+    }
+
+    /**
+     * This method allows to specify a keyfinder in order to partition the output stream
+     * 
+     * @param keyFinder
+     *            used for identifying keys from the events
+     */
+    protected void setKeyFinder(KeyFinder<Event> keyFinder) {
+        this.remoteStreamKeyFinder = keyFinder;
+    }
+
+    @Override
+    protected void onClose() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
new file mode 100644
index 0000000..83914bd
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
@@ -0,0 +1,28 @@
+package org.apache.s4.core.util;
+
+import java.util.Map;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+/**
+ * Injects String parameters from a map. Used for loading parameters outside of config files.
+ * 
+ */
+public class ParametersInjectionModule extends AbstractModule {
+
+    Map<String, String> params;
+
+    public ParametersInjectionModule(Map<String, String> params) {
+        this.params = params;
+    }
+
+    @Override
+    protected void configure() {
+        for (Map.Entry<String, String> param : params.entrySet()) {
+            bind(String.class).annotatedWith(Names.named(param.getKey())).toInstance(param.getValue());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r b/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r
deleted file mode 100644
index b1d0cac..0000000
Binary files a/subprojects/s4-core/src/main/resources/apps/CounterExample.s4r and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/resources/default.s4.core.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/default.s4.core.properties b/subprojects/s4-core/src/main/resources/default.s4.core.properties
new file mode 100644
index 0000000..a078f75
--- /dev/null
+++ b/subprojects/s4-core/src/main/resources/default.s4.core.properties
@@ -0,0 +1,2 @@
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/main/resources/s4-core.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/s4-core.properties b/subprojects/s4-core/src/main/resources/s4-core.properties
deleted file mode 100644
index 7db6157..0000000
--- a/subprojects/s4-core/src/main/resources/s4-core.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-comm.module = org.apache.s4.comm.Module
-s4.logger_level = TRACE
-#s4.apps.path = /My/Apps/Dir
-
-

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 7a01992..12f5d2f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
+import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.core.triggers.TriggeredApp;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
@@ -44,8 +45,9 @@ public abstract class TriggerTest extends ZkBasedTest {
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        Injector injector = Guice.createInjector(new DefaultModule(Resources.newInputStreamSupplier(
-                Resources.getResource("default.s4.properties")).getInput()));
+        Injector injector = Guice.createInjector(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
         app = injector.getInstance(TriggeredApp.class);
         app.init();
         app.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index f5f4ce4..4eae82b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -15,7 +15,6 @@ import junit.framework.Assert;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -34,7 +33,6 @@ import org.junit.Test;
 
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
-import com.google.common.io.Resources;
 import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
@@ -53,7 +51,7 @@ public class TestAutomaticDeployment {
     private Factory zookeeperServerConnectionFactory;
     private Process forkedNode;
     private ZkClient zkClient;
-    private String clusterName;
+    private static final String CLUSTER_NAME = "clusterZ";
     private HttpServer httpServer;
     private static File tmpAppsDir;
 
@@ -72,43 +70,23 @@ public class TestAutomaticDeployment {
                 + tmpAppsDir.getAbsolutePath() });
     }
 
-    @Before
-    public void cleanLocalAppsDir() throws ConfigurationException, IOException {
-        PropertiesConfiguration config = loadConfig();
-
-        if (!new File(config.getString("appsDir")).exists()) {
-            Assert.assertTrue(new File(config.getString("appsDir")).mkdirs());
-        } else {
-            if (!config.getString("appsDir").startsWith("/tmp")) {
-                Assert.fail("apps dir should a subdir of /tmp for safety");
-            }
-            CommTestUtils.deleteDirectoryContents(new File(config.getString("appsDir")));
-        }
-    }
-
-    private PropertiesConfiguration loadConfig() throws ConfigurationException, IOException {
-        PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
-        return config;
-    }
-
     // ignore this test since now we only deploy from artifacts published through zookeeper
     @Test
     @Ignore
     public void testInitialDeploymentFromFileSystem() throws Exception {
 
-        File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
-                + System.currentTimeMillis() + ".s4r");
-
-        Assert.assertTrue(ByteStreams.copy(
-                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
-                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
-
-        initializeS4Node();
-
-        final String uri = s4rToDeploy.toURI().toString();
-
-        assertDeployment(uri, true);
+        // File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
+        // + System.currentTimeMillis() + ".s4r");
+        //
+        // Assert.assertTrue(ByteStreams.copy(
+        // Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+        // + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+        //
+        // initializeS4Node();
+        //
+        // final String uri = s4rToDeploy.toURI().toString();
+        //
+        // assertDeployment(uri, true);
 
     }
 
@@ -143,7 +121,7 @@ public class TestAutomaticDeployment {
         if (!initial) {
             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
             record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+            zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp", record, CreateMode.PERSISTENT);
         }
 
         Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
@@ -181,11 +159,11 @@ public class TestAutomaticDeployment {
 
         ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
         record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
-        zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp1", record1, CreateMode.PERSISTENT);
 
         ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
         record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
-        zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp2", record2, CreateMode.PERSISTENT);
 
         Assert.assertTrue(signalApp1Initialized.await(20, TimeUnit.SECONDS));
         Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
@@ -282,21 +260,17 @@ public class TestAutomaticDeployment {
         // current package .
 
         // 1. start s4 nodes. Check that no app is deployed.
-        PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
-
-        clusterName = config.getString("cluster.name");
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1, 1300);
+        taskSetup.clean(CLUSTER_NAME);
+        taskSetup.setup(CLUSTER_NAME, 1, 1300);
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
+        List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
         Assert.assertTrue(processes.size() == 0);
         final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 
-        zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
+        zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
 
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -307,10 +281,7 @@ public class TestAutomaticDeployment {
             }
         });
 
-        File tmpConfig = File.createTempFile("tmp", "config");
-        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
-                Files.newOutputStreamSupplier(tmpConfig)) > 0);
-        forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
 
         // TODO synchro with ready state from zk
         Thread.sleep(10000);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index ca42d9d..5062660 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -2,7 +2,6 @@ package org.apache.s4.deploy.prodcon;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -36,7 +35,7 @@ public class TestProducerConsumer {
     private Factory zookeeperServerConnectionFactory;
     private Process forkedNode;
     private ZkClient zkClient;
-    private String clusterName;
+    private final static String CLUSTER_NAME = "prodconcluster";
     private HttpServer httpServer;
     private static File tmpAppsDir;
 
@@ -92,7 +91,7 @@ public class TestProducerConsumer {
     private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException,
             IOException {
         PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
+        config.load(Resources.getResource("default.s4.core.properties").openStream());
         return config;
     }
 
@@ -118,11 +117,11 @@ public class TestProducerConsumer {
 
         ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
         record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
-        zkClient.create("/s4/clusters/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/showtime", record1, CreateMode.PERSISTENT);
 
         ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
         record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
-        zkClient.create("/s4/clusters/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/counter", record2, CreateMode.PERSISTENT);
 
         // TODO validate test through some Zookeeper notifications
         Thread.sleep(10000);
@@ -134,25 +133,17 @@ public class TestProducerConsumer {
         // current package .
 
         // 1. start s4 nodes. Check that no app is deployed.
-        // PropertiesConfiguration config = new PropertiesConfiguration();
-        // config.load(Resources.newInputStreamSupplier(Resources.getResource("/org.apache.s4.deploy.s4.properties"))
-        // .getInput());
-        InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-        PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(is);
-
-        clusterName = config.getString("cluster.name");
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
         taskSetup.clean("s4");
-        taskSetup.setup(clusterName, 1, 1300);
+        taskSetup.setup(CLUSTER_NAME, 1, 1300);
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
+        List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
         Assert.assertTrue(processes.size() == 0);
         final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 
-        zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
+        zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
 
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -163,10 +154,7 @@ public class TestProducerConsumer {
             }
         });
 
-        File tmpConfig = File.createTempFile("tmp", "config");
-        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
-                Files.newOutputStreamSupplier(tmpConfig)) > 0);
-        forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
 
         // TODO synchro with ready state from zk
         Thread.sleep(10000);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
deleted file mode 100644
index 18aaf9c..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.core.RemoteSenders;
-
-public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
-    private final Class<?> appClass;
-
-    private Class<?> findAppClass() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        return (Class<?>) fieldArgTypes[0];
-    }
-
-    protected ZkBasedAppModule() {
-        super();
-        this.appClass = findAppClass();
-    }
-
-    protected ZkBasedAppModule(Class<? extends Emitter> emitterClass,
-            Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
-        super(emitterClass, remoteEmitterClass, listenerClass);
-        this.appClass = findAppClass();
-    }
-
-    @Override
-    protected void configure() {
-        super.configure();
-        bind(appClass);
-        bind(RemoteSenders.class);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
index e493385..9907bee 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
@@ -1,7 +1,13 @@
 package org.apache.s4.wordcount;
 
-import org.apache.s4.fixtures.ZkBasedAppModule;
+import com.google.inject.AbstractModule;
 
-public class WordCountModule extends ZkBasedAppModule<WordCountApp> {
+public class WordCountModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(WordCountApp.class);
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 14c7abd..2365a48 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -6,9 +6,8 @@ import java.util.concurrent.CountDownLatch;
 
 import junit.framework.Assert;
 
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -19,8 +18,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.io.Resources;
-
 public class WordCountTest {
 
     public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
@@ -58,12 +55,12 @@ public class WordCountTest {
     public void testSimple() throws Exception {
         final ZooKeeper zk = CommTestUtils.createZkClient();
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
-        PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
+        String clusterName = "clusterA";
         taskSetup.clean("s4");
-        taskSetup.setup(config.getString("cluster.name"), 1, 10000);
+        taskSetup.setup(clusterName, 1, 10000);
 
-        App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
+        Main.main(new String[] { "-cluster=" + clusterName, "-appClass=" + WordCountApp.class.getName(),
+                "-extraModulesClasses=" + WordCountModule.class.getName() });
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
deleted file mode 100644
index 488bd21..0000000
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.name = cluster1
-cluster.zk_address = localhost:2181
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
-s4.logger_level = DEBUG
-appsDir=/tmp/deploy-test
-tcp.partition.queue_size=1000
-comm.timeout=100
-comm.retry_delay=100
-comm.retries=10

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
index c50e44d..067c551 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/Main.java
@@ -1,34 +1,32 @@
 /*
  * Copyright (c) 2011 The S4 Project, http://s4.io.
  * All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.example.model;
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 
 public class Main {
-    
+
     /**
      * @param args
      */
     public static void main(String[] args) {
 
-        Injector injector = Guice.createInjector(new Module(), new org.apache.s4.comm.Module());
-
-        Controller controller = injector.getInstance(Controller.class);
-        controller.start();
+        // Injector injector = Guice.createInjector(new DefaultModule());
+        //
+        // Controller controller = injector.getInstance(Controller.class);
+        // controller.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/s4-tools.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
index 406d670..e8edd14 100644
--- a/subprojects/s4-tools/s4-tools.gradle
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -40,6 +40,13 @@ dependencies {
 apply plugin:'application'
 mainClassName = "org.apache.s4.tools.Tools"
 
+task copyTemplates << {
+    copy {
+        from 'src/main/java/hello'
+        into 'src/main/resources'
+    }
+}
+
 applicationDistribution.from(configurations.compile) {
     // this is supposed to be done by default but is not anymore in 1.0-rc-3
     include "*.jar"
@@ -52,6 +59,7 @@ startScripts {
 
 
 
+
 run {
     // run doesn't yet directly accept command line parameters...
     if ( project.hasProperty('args') ) {