You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/05 12:26:52 UTC

git commit: Fix dependency injection issues

Updated Branches:
  refs/heads/S4-110-new 55f8c2157 -> 24a11e18b


Fix dependency injection issues

- allow for a clear separation of the bootstrap layer (i.e. process + connectivity to
cluster manager) from platform configuration (comm and core bindings) and application code

- unfortunately there is no communication between nodes now.
Main issue relates to partitioning based on streams: the Emitter implementations need
to have the information about where to send messages. It used to be a simple mapping
partitionID -> node, but by adding streams it get more complex. One option would be to
have 1 emitter per stream. Another is to resolve mappings in the sender.
These solutions imply many changes, so I wonder whether we could first have a version
without variable partitions per streams?


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/24a11e18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/24a11e18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/24a11e18

Branch: refs/heads/S4-110-new
Commit: 24a11e18b35436c5d60dfe07dc6fe81565fd91f4
Parents: 55f8c21
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Feb 5 13:03:10 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Feb 5 13:18:58 2013 +0100

----------------------------------------------------------------------
 .../src/main/resources/default.s4.base.properties  |    3 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |    6 +-
 .../org/apache/s4/comm/HelixBasedCommModule.java   |  109 +----------
 .../apache/s4/comm/topology/ClusterFromHelix.java  |   13 +-
 .../s4/comm/topology/HelixRemoteStreams.java       |   24 +++
 .../main/java/org/apache/s4/core/BaseModule.java   |   31 ++--
 .../org/apache/s4/core/HelixBasedCoreModule.java   |   71 +-------
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |   14 +-
 .../java/org/apache/s4/core/S4HelixBootstrap.java  |  157 +++++++--------
 .../src/main/java/org/apache/s4/core/S4Node.java   |    9 +-
 .../src/main/java/org/apache/s4/core/Server.java   |    8 +-
 .../java/org/apache/s4/core/util/AppConfig.java    |   24 +++
 .../java/org/apache/s4/deploy/AppStateModel.java   |  139 ++++++++-----
 .../org/apache/s4/deploy/AppStateModelFactory.java |   11 +-
 .../s4/deploy/HelixBasedDeploymentManager.java     |   21 +-
 .../src/main/resources/default.s4.core.properties  |    2 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |    2 +-
 .../java/org/apache/s4/tools/helix/DeployApp.java  |   56 +++++-
 18 files changed, 318 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-base/src/main/resources/default.s4.base.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/resources/default.s4.base.properties b/subprojects/s4-base/src/main/resources/default.s4.base.properties
index 8acb17c..681232e 100644
--- a/subprojects/s4-base/src/main/resources/default.s4.base.properties
+++ b/subprojects/s4-base/src/main/resources/default.s4.base.properties
@@ -1,4 +1,5 @@
 # default properties for bootstraping a node
 s4.cluster.zk_address = localhost:2181
 s4.cluster.zk_session_timeout = 10000
-s4.cluster.zk_connection_timeout = 10000
\ No newline at end of file
+s4.cluster.zk_connection_timeout = 10000
+s4.logger_level = DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index c557413..a1d6319 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -33,8 +33,6 @@ import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
 import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
 import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromZK;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.ClustersFromZK;
 import org.slf4j.Logger;
@@ -90,10 +88,8 @@ public class DefaultCommModule extends AbstractModule {
         install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
                 SerializerDeserializerFactory.class));
 
-        bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
-
+        // bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
         bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
-
         bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
         bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index 980f9bd..b2e92c7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -1,48 +1,18 @@
 package org.apache.s4.comm;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
-import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromHelix;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromHelix;
-import org.apache.s4.comm.topology.ClusterFromZK;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.ClustersFromHelix;
-import org.apache.s4.comm.topology.ClustersFromZK;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Scopes;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
 public class HelixBasedCommModule extends AbstractModule {
 
     private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
-    InputStream commConfigInputStream;
-    private PropertiesConfiguration config;
-    String clusterName;
 
     /**
      * 
@@ -52,96 +22,19 @@ public class HelixBasedCommModule extends AbstractModule {
      *            the name of the cluster to which the current node belongs. If specified in the configuration file,
      *            this parameter will be ignored.
      */
-    public HelixBasedCommModule(InputStream commConfigInputStream, String clusterName) {
-        super();
-        this.commConfigInputStream = commConfigInputStream;
-        this.clusterName = clusterName;
+    public HelixBasedCommModule() {
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (commConfigInputStream != null) {
-            try {
-                commConfigInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-        /* Use Kryo to serialize events. */
-        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
-                SerializerDeserializerFactory.class));
 
         // a node holds a single partition assignment
         // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
         bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
                 TaskStateModelFactory.class);
-        
-        bind(Assignment.class).to(AssignmentFromHelix.class);
-        
-        bind(Cluster.class).to(ClusterFromHelix.class);
 
         bind(Clusters.class).to(ClustersFromHelix.class);
 
-        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
-
-        bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
-        
-        try {
-            Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
-                    .getString("s4.comm.emitter.class"));
-            bind(Emitter.class).to(emitterClass);
-
-            // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
-            Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
-                    .getString("s4.comm.emitter.remote.class"));
-            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
-                    RemoteEmitterFactory.class));
-            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
-
-//            bind(Listener.class).to(
-//                    (Class<? extends Listener>) Class.forName(config.getString("s4.comm.listener.class")));
-
-        } catch (ClassNotFoundException e) {
-            logger.error("Cannot find class implementation ", e);
-        }
-
-    }
-
-    @SuppressWarnings("serial")
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(commConfigInputStream);
-
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
-            if (clusterName != null) {
-                if (config.containsKey("s4.cluster.name")) {
-                    logger.warn(
-                            "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
-                            clusterName, config.getProperty("s4.cluster.name"));
-                } else {
-                    Names.bindProperties(binder, new HashMap<String, String>() {
-                        {
-                            put("s4.cluster.name", clusterName);
-                        }
-                    });
-                }
-            }
-
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 19e0628..7c49ac0 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -27,23 +27,21 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
 
 /**
  * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
@@ -114,6 +112,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
                 }
             }
             partitionCountMapRef.set(map);
+
             for (ClusterChangeListener listener : listeners) {
                 listener.onChange();
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java
new file mode 100644
index 0000000..b6b4558
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java
@@ -0,0 +1,24 @@
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class HelixRemoteStreams implements RemoteStreams {
+
+    @Override
+    public Set<StreamConsumer> getConsumers(String streamName) {
+        // TODO implement?
+        return Collections.emptySet();
+    }
+
+    @Override
+    public void addOutputStream(String appId, String clusterName, String streamName) {
+        // TODO implement?
+    }
+
+    @Override
+    public void addInputStream(int appId, String clusterName, String streamName) {
+        // TODO implement?
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index b9679e7..171c564 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -6,16 +6,15 @@ import java.util.HashMap;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromHelix;
-import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterFromHelix;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.HelixBasedDeploymentManager;
+import org.apache.s4.deploy.AppStateModelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,11 +30,10 @@ public class BaseModule extends AbstractModule {
     private PropertiesConfiguration config;
     InputStream baseConfigInputStream;
     String clusterName;
-    private String instanceName;
+    private final String instanceName;
     boolean useHelix = true;
 
-    public BaseModule(InputStream baseConfigInputStream, String clusterName,
-            String instanceName) {
+    public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName) {
         super();
         this.baseConfigInputStream = baseConfigInputStream;
         this.clusterName = clusterName;
@@ -48,32 +46,31 @@ public class BaseModule extends AbstractModule {
             loadProperties(binder());
         }
         if (useHelix) {
-            bind(Assignment.class).to(AssignmentFromHelix.class)
-                    .asEagerSingleton();
+            bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+            bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
             bind(Cluster.class).to(ClusterFromHelix.class);
-            bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+            bind(TaskStateModelFactory.class);
+            bind(AppStateModelFactory.class).in(Scopes.SINGLETON);
+            // bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
             bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
             bind(Bootstrap.class).to(S4HelixBootstrap.class);
 
             // share the Zookeeper connection
-            bind(ZkClient.class).toProvider(ZkClientProvider.class).in(
-                    Scopes.SINGLETON);
             return;
         }
         // a node holds a single partition assignment
         // ==> Assignment is a singleton so it shared between base, comm and app
         // layers.
         // it is eager so that the node is able to join a cluster immediately
-        bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+        // bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
         // bind(Cluster.class).to(ClusterFromZK.class);
 
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
         bind(Bootstrap.class).to(S4Bootstrap.class);
 
         // share the Zookeeper connection
-        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(
-                Scopes.SINGLETON);
+        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
 
     }
 
@@ -86,8 +83,7 @@ public class BaseModule extends AbstractModule {
             // TODO - validate properties.
 
             /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder,
-                    ConfigurationConverter.getProperties(config));
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
 
             if (clusterName != null) {
                 if (config.containsKey("s4.cluster.name")) {
@@ -106,8 +102,7 @@ public class BaseModule extends AbstractModule {
                 if (config.containsKey("s4.instance.name")) {
                     logger.warn(
                             "instanceName [{}] passed as a parameter will not be used because an existing s4.instance.name parameter of value [{}] was found in the configuration file and will be used",
-                            instanceName,
-                            config.getProperty("s4.instance.name"));
+                            instanceName, config.getProperty("s4.instance.name"));
                 } else {
                     Names.bindProperties(binder, new HashMap<String, String>() {
                         {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 0ce80ed..01f086c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -19,38 +19,19 @@
 package org.apache.s4.core;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
 
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.topology.HelixRemoteStreams;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.ZkRemoteStreams;
-import org.apache.s4.core.ft.CheckpointingFramework;
-import org.apache.s4.core.ft.NoOpCheckpointingFramework;
-import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.SenderExecutorServiceFactory;
-import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.apache.s4.deploy.HelixBasedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.name.Named;
-import com.google.inject.name.Names;
 
 /**
  * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
@@ -61,45 +42,15 @@ public class HelixBasedCoreModule extends AbstractModule {
 
     private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
 
-    InputStream coreConfigFileInputStream;
-    private PropertiesConfiguration config;
-
-    String clusterName = null;
-
-    public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
-        this.coreConfigFileInputStream = coreConfigFileInputStream;
+    public HelixBasedCoreModule() {
     }
 
     @Override
     protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (coreConfigFileInputStream != null) {
-            try {
-                coreConfigFileInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
 
         bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
-        bind(S4RLoaderFactory.class);
-
-        // For enabling checkpointing, one needs to use a custom module, such as
-        // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
-        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
-
-        // shed load in local sender only by default
-        bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
-        bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
-
-        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
-
-        bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
+        bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
         bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
 
     }
@@ -115,20 +66,4 @@ public class HelixBasedCoreModule extends AbstractModule {
         return tmpS4Dir;
     }
 
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(coreConfigFileInputStream);
-
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 61f6b7f..b3a5f3d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -24,6 +24,7 @@ import org.apache.s4.comm.util.ArchiveFetchException;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.core.util.ParametersInjectionModule;
+import org.apache.s4.deploy.DeploymentManager;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import com.google.inject.util.Modules.OverriddenModuleBuilder;
  * 
  * 
  */
-public class S4Bootstrap implements Bootstrap{
+public class S4Bootstrap implements Bootstrap {
     private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
 
     private final ZkClient zkClient;
@@ -90,6 +91,7 @@ public class S4Bootstrap implements Bootstrap{
         zkClient.subscribeDataChanges(appPath, new AppChangeListener());
     }
 
+    @Override
     public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException {
         this.parentInjector = parentInjector;
         if (zkClient.exists(appPath)) {
@@ -112,7 +114,7 @@ public class S4Bootstrap implements Bootstrap{
         List<File> modulesLocalCopies = new ArrayList<File>();
 
         for (String uriString : appConfig.getCustomModulesURIs()) {
-            modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString));
+            modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString, fetcher));
         }
         final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
 
@@ -129,7 +131,8 @@ public class S4Bootstrap implements Bootstrap{
 
     }
 
-    private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
+    public static File fetchModuleAndCopyToLocalFile(String appName, String uriString, ArchiveFetcher fetcher)
+            throws ArchiveFetchException {
 
         URI uri;
         try {
@@ -208,7 +211,10 @@ public class S4Bootstrap implements Bootstrap{
                     logger.info("S4 node in standby until app class or app URI is specified");
                 }
                 Server server = injector.getInstance(Server.class);
-                server.start(injector);
+                server.setInjector(injector);
+                DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
+                deploymentManager.deploy(appConfig);
+                // server.start(injector);
             }
         } catch (Exception e) {
             logger.error("Cannot start S4 node", e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index d20c962..3944971 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -1,51 +1,29 @@
 package org.apache.s4.core;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.s4.base.util.ModulesLoader;
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.comm.ModulesLoaderFactory;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetchException;
 import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.core.util.ParametersInjectionModule;
 import org.apache.s4.deploy.AppStateModelFactory;
-import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
-import com.google.inject.Module;
 import com.google.inject.name.Named;
-import com.google.inject.util.Modules;
-import com.google.inject.util.Modules.OverriddenModuleBuilder;
 
 /**
  * This is the bootstrap for S4 nodes.
@@ -56,84 +34,85 @@ import com.google.inject.util.Modules.OverriddenModuleBuilder;
  * <li>look for application deployed on the S4 cluster
  * </ul>
  * <p>
- * When an application is available, custom modules are fetched if necessary and
- * a full-featured S4 node is started. The application code is then downloaded
- * and the app started.
+ * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
+ * application code is then downloaded and the app started.
  * <p>
- * For testing purposes, it is also possible to directly start an application
- * without fetching remote code, provided the application classes are available
- * in the classpath.
+ * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
+ * application classes are available in the classpath.
  * 
  * 
  * 
  */
-public class S4HelixBootstrap implements Bootstrap{
-	private static Logger logger = LoggerFactory
-			.getLogger(S4HelixBootstrap.class);
+public class S4HelixBootstrap implements Bootstrap {
+    private static Logger logger = LoggerFactory.getLogger(S4HelixBootstrap.class);
 
-	private final AtomicBoolean deployed = new AtomicBoolean(false);
+    private final AtomicBoolean deployed = new AtomicBoolean(false);
 
-	private final ArchiveFetcher fetcher;
+    private final ArchiveFetcher fetcher;
 
-	private Injector parentInjector;
+    // private Injector parentInjector;
 
-	CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+    CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
 
-	private String clusterName;
+    private final String clusterName;
 
-	private String instanceName;
+    private final String instanceName;
+
+    private final String zookeeperAddress;
+    private final TaskStateModelFactory taskStateModelFactory;
+
+    private final AppStateModelFactory appStateModelFactory;
+
+    private final Cluster cluster;
+
+    private final Lock startingNode = new ReentrantLock();
+
+    public static Injector rootInjector;
 
-	private String zookeeperAddress;
-    @Inject
-    private TaskStateModelFactory taskStateModelFactory;
-    
-    @Inject
-    private AppStateModelFactory appStateModelFactory;
-    
     @Inject
-    private Cluster cluster;
-
-	@Inject
-	public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.instance.name") String instanceName,
-			@Named("s4.cluster.zk_address") String zookeeperAddress,
-			@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-			@Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
-			ArchiveFetcher fetcher) {
+    public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
+            AppStateModelFactory appStateModelFactory, TaskStateModelFactory taskStateModelFactory,
+            ArchiveFetcher fetcher, Cluster cluster) {
         this.clusterName = clusterName;
         this.instanceName = instanceName;
         this.zookeeperAddress = zookeeperAddress;
-		this.fetcher = fetcher;
+        this.taskStateModelFactory = taskStateModelFactory;
+        this.appStateModelFactory = appStateModelFactory;
+        this.fetcher = fetcher;
+        this.cluster = cluster;
+    }
+
+    @Override
+    public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException, UnknownHostException {
+
+        // start a HelixController to manage the cluster
+        // TODO set this as optional (small clusters only)
+        String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
+        HelixControllerMain.startHelixController(zookeeperAddress, clusterName, controllerName,
+                HelixControllerMain.STANDALONE);
+        // this.parentInjector = parentInjector;
+        S4HelixBootstrap.rootInjector = parentInjector;
         registerWithHelix();
-	}
-
-	public void start(Injector parentInjector) throws InterruptedException,
-			ArchiveFetchException {
-		this.parentInjector = parentInjector;
-		if (!deployed.get()) {
-
-		}
-		signalOneAppLoaded.await();
-	}
-	
-	private void registerWithHelix()
-    {
-      HelixManager helixManager;
-      try
-      {
-        helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
-            instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
-        helixManager.getStateMachineEngine().registerStateModelFactory(
-          "LeaderStandby", taskStateModelFactory);
-        helixManager.getStateMachineEngine().registerStateModelFactory(
-          "OnlineOffline", appStateModelFactory);
-        helixManager.connect();  
-        helixManager.addExternalViewChangeListener((RoutingTableProvider)cluster);
-      } catch (Exception e)
-      {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+
+        signalOneAppLoaded.await();
+    }
+
+    private void registerWithHelix() {
+        HelixManager helixManager;
+        try {
+            helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+                    zookeeperAddress);
+            helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", taskStateModelFactory);
+            helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", appStateModelFactory);
+            helixManager.connect();
+            helixManager.addExternalViewChangeListener((RoutingTableProvider) cluster);
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index db84c35..ab2be80 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -3,7 +3,6 @@ package org.apache.s4.core;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 
-import org.apache.s4.comm.util.ArchiveFetchException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,11 +50,11 @@ public class S4Node {
 
         Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
                 "default.s4.base.properties").openStream(), nodeArgs.clusterName, nodeArgs.instanceName) });
-        S4Bootstrap bootstrap = injector.getInstance(S4Bootstrap.class);
+        Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
         try {
             bootstrap.start(injector);
-        } catch (ArchiveFetchException e1) {
-            logger.error("Cannot fetch module dependencies.", e1);
+        } catch (Exception e1) {
+            logger.error("Cannot start node ", e1);
         }
     }
 
@@ -74,7 +73,7 @@ public class S4Node {
 
         @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
         String zkConnectionString;
-        
+
         @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
         String instanceName = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index d344698..c5917b7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -27,7 +27,7 @@ import java.util.jar.JarFile;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.base.util.S4RLoader;
 import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.deploy.DeploymentManager;
 import org.slf4j.Logger;
@@ -57,7 +57,7 @@ public class Server {
     private DeploymentManager deploymentManager;
 
     @Inject
-    private AssignmentFromZK assignment;
+    private Assignment assignment;
 
     private final ZkClient zkClient;
 
@@ -75,6 +75,10 @@ public class Server {
         zkClient.setZkSerializer(new ZNRecordSerializer());
     }
 
+    public void setInjector(Injector injector) {
+        this.injector = injector;
+    }
+
     public void start(Injector injector) throws Exception {
 
         this.injector = injector;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
index a31ff17..5b9b0e7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
@@ -7,6 +7,8 @@ import java.util.Map;
 
 import org.apache.s4.comm.topology.ZNRecord;
 
+import com.beust.jcommander.internal.Maps;
+
 public class AppConfig {
 
     public static final String NAMED_PARAMETERS = "namedParams";
@@ -70,6 +72,10 @@ public class AppConfig {
         return namedParameters;
     }
 
+    public boolean isValid() {
+        return (appClassName != null || appURI != null) && appName != null;
+    }
+
     public String getNamedParametersAsString() {
         if (namedParameters == null || namedParameters.isEmpty()) {
             return "";
@@ -104,6 +110,24 @@ public class AppConfig {
         return record;
     }
 
+    public Map<String, String> asMap() {
+        Map<String, String> result = Maps.newHashMap();
+        result.put(APP_NAME, appName);
+        result.put(APP_URI, appURI);
+        StringBuilder sb = new StringBuilder();
+        for (String customModuleName : customModulesNames) {
+            sb.append(customModuleName + ",");
+        }
+        result.put(MODULES_CLASSES, sb.toString());
+        sb = new StringBuilder();
+        for (String customModulesURI : customModulesURIs) {
+            sb.append(customModulesURI + ",");
+        }
+        result.put(MODULES_URIS, sb.toString());
+        result.put(NAMED_PARAMETERS, getNamedParametersAsString());
+        return result;
+    }
+
     @Override
     public String toString() {
         return "app name: [" + appName + "] \n " + "app class: [" + appClassName + "] \n" + "app URI : [" + appURI

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 5f21f51..b70c27f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -1,9 +1,10 @@
 package org.apache.s4.deploy;
 
+import java.io.File;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ConfigScope;
@@ -14,60 +15,96 @@ import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
+import org.apache.s4.base.util.ModulesLoader;
+import org.apache.s4.comm.ModulesLoaderFactory;
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.S4Bootstrap;
+import org.apache.s4.core.S4HelixBootstrap;
 import org.apache.s4.core.util.AppConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.inject.Injector;
+
 @StateModelInfo(states = { "ONLINE,OFFLINE" }, initialState = "OFFLINE")
 public class AppStateModel extends StateModel {
-	private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
-	private final String appName;
-	private DeploymentManager deploymentManager;
-
-	public AppStateModel(DeploymentManager deploymentManager, String appName) {
-		this.deploymentManager = deploymentManager;
-		this.appName = appName;
-	}
-
-	@Transition(from = "OFFLINE", to = "ONLINE")
-	public void deploy(Message message, NotificationContext context)
-			throws Exception {
-		logger.info("Deploying app:"+ appName);
-		HelixManager manager = context.getManager();
-		ConfigAccessor configAccessor = manager.getConfigAccessor();
-		AppConfig config = createAppConfig(manager, configAccessor);
-		deploymentManager.deploy(config);
-		logger.info("Deployed app:"+ appName);
-
-	}
-
-	private AppConfig createAppConfig(HelixManager manager,
-			ConfigAccessor configAccessor) {
-		ConfigScopeBuilder builder = new ConfigScopeBuilder();
-		ConfigScope scope = builder.forCluster(manager.getClusterName())
-				.forResource(appName).build();
-		String appURI = configAccessor.get(scope,
-				DeploymentManager.S4R_URI);
-		String clusterName = manager.getClusterName();
-		String appClassName = null;
-		List<String> customModulesNames = new ArrayList<String>();
-		List<String> customModulesURIs = new ArrayList<String>();
-		Map<String, String> namedParameters = new HashMap<String, String>();
-		AppConfig config = new AppConfig(clusterName, appClassName, appURI,
-				customModulesNames, customModulesURIs, namedParameters);
-		return config;
-	}
-
-	@Transition(from = "OFFLINE", to = "ONLINE")
-	public void undeploy(Message message, NotificationContext context)
-			throws Exception {
-		logger.info("Undeploying app:"+ appName);
-		HelixManager manager = context.getManager();
-		ConfigAccessor configAccessor = manager.getConfigAccessor();
-		AppConfig config = createAppConfig(manager, configAccessor);
-		deploymentManager.undeploy(config);
-		logger.info("Undeploying app:"+ appName);
-
-	}
+    private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
+    private final String appName;
+    private final ArchiveFetcher fetcher;
+
+    public AppStateModel(String appName, ArchiveFetcher fetcher) {
+        this.appName = appName;
+        this.fetcher = fetcher;
+    }
+
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void deploy(Message message, NotificationContext context) throws Exception {
+        logger.info("Deploying app:" + appName);
+        HelixManager manager = context.getManager();
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        AppConfig appConfig = createAppConfig(manager, configAccessor);
+        loadModulesAndStartNode(S4HelixBootstrap.rootInjector, appConfig);
+        logger.info("Deployed app:" + appName);
+
+    }
+
+    private AppConfig createAppConfig(HelixManager manager, ConfigAccessor configAccessor) {
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
+        AppConfig appConfig = new AppConfig.Builder()
+                .appClassName(configAccessor.get(scope, AppConfig.APP_CLASS))
+                .appName(configAccessor.get(scope, AppConfig.APP_NAME))
+                .customModulesNames(
+                        getListFromCommaSeparatedValues(configAccessor.get(scope, AppConfig.MODULES_CLASSES)))
+                .customModulesURIs(getListFromCommaSeparatedValues(configAccessor.get(scope, AppConfig.MODULES_URIS)))
+                .appURI(configAccessor.get(scope, AppConfig.APP_URI)).build();
+
+        return appConfig;
+    }
+
+    private static List<String> getListFromCommaSeparatedValues(String values) {
+        if (com.google.common.base.Strings.isNullOrEmpty(values)) {
+            return Collections.emptyList();
+        }
+        return Arrays.asList(values.split("[,]"));
+
+    }
+
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void undeploy(Message message, NotificationContext context) throws Exception {
+        logger.info("Undeploying app:" + appName);
+        HelixManager manager = context.getManager();
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        AppConfig config = createAppConfig(manager, configAccessor);
+        // deploymentManager.undeploy(config);
+        logger.info("Undeploying app:" + appName);
+
+    }
+
+    private void loadModulesAndStartNode(final Injector parentInjector, final AppConfig appConfig)
+            throws ArchiveFetchException {
+
+        String appName = appConfig.getAppName();
+
+        List<File> modulesLocalCopies = new ArrayList<File>();
+
+        for (String uriString : appConfig.getCustomModulesURIs()) {
+            modulesLocalCopies.add(S4Bootstrap.fetchModuleAndCopyToLocalFile(appName, uriString, fetcher));
+        }
+        final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+
+        Thread t = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                // load app class through modules classloader and start it
+                S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+                // signalOneAppLoaded.countDown();
+            }
+        }, "S4 platform loader");
+        t.start();
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
index 4f6dc00..2aa376a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
@@ -2,24 +2,23 @@ package org.apache.s4.deploy;
 
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.Server;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 @Singleton
 public class AppStateModelFactory extends StateModelFactory<AppStateModel> {
-    private final DeploymentManager deploymentManager;
 
-    @Inject
-    public AppStateModelFactory(DeploymentManager deploymentManager, ArchiveFetcher fetcher) {
-        this.deploymentManager = deploymentManager;
+    private final ArchiveFetcher fetcher;
 
+    @Inject
+    public AppStateModelFactory(ArchiveFetcher fetcher) {
+        this.fetcher = fetcher;
     }
 
     @Override
     public AppStateModel createNewStateModel(String partitionName) {
-        return new AppStateModel(deploymentManager,partitionName);
+        return new AppStateModel(partitionName, fetcher);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index caec699..c5ba6ed 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -11,7 +11,7 @@ public class HelixBasedDeploymentManager implements DeploymentManager {
     private final Server server;
     boolean deployed = false;
     private final String clusterName;
-	private ArchiveFetcher fetcher;
+    private final ArchiveFetcher fetcher;
 
     @Inject
     public HelixBasedDeploymentManager(@Named("s4.cluster.name") String clusterName,
@@ -20,23 +20,22 @@ public class HelixBasedDeploymentManager implements DeploymentManager {
             @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, ArchiveFetcher fetcher) {
         this.clusterName = clusterName;
         this.server = server;
-		this.fetcher = fetcher;
+        this.fetcher = fetcher;
 
     }
 
     @Override
     public void start() {
-        
     }
 
-	@Override
-	public void deploy(AppConfig appConfig) throws DeploymentFailedException {
-		DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
-	}
+    @Override
+    public void deploy(AppConfig appConfig) throws DeploymentFailedException {
+        DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+    }
 
-	@Override
-	public void undeploy(AppConfig appConfig) {
-		
-	}
+    @Override
+    public void undeploy(AppConfig appConfig) {
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/resources/default.s4.core.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/default.s4.core.properties b/subprojects/s4-core/src/main/resources/default.s4.core.properties
index cb5e20a..8b13789 100644
--- a/subprojects/s4-core/src/main/resources/default.s4.core.properties
+++ b/subprojects/s4-core/src/main/resources/default.s4.core.properties
@@ -1 +1 @@
-s4.logger_level = DEBUG
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 76a1d43..01eec0f 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -135,7 +135,7 @@ public class Deploy extends S4ArgsBase {
 
     }
 
-    private static Map<String, String> convertListArgsToMap(List<String> args) {
+    public static Map<String, String> convertListArgsToMap(List<String> args) {
         Map<String, String> result = Maps.newHashMap();
         for (String arg : args) {
             String[] split = arg.split("[=]");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index 813f13d..cc3d128 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -1,13 +1,13 @@
 package org.apache.s4.tools.helix;
 
 import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
@@ -15,15 +15,25 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.tools.IdealStateCalculatorByShuffling;
-import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.comm.HelixBasedCommModule;
+import org.apache.s4.core.HelixBasedCoreModule;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.tools.Deploy;
+import org.apache.s4.tools.Deploy.InlineConfigParameterConverter;
 import org.apache.s4.tools.S4ArgsBase;
 import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 
 public class DeployApp extends S4ArgsBase {
+
+    private static Logger logger = LoggerFactory.getLogger(DeployApp.class);
+
     public static void main(String[] args) {
         DeployAppArgs deployArgs = new DeployAppArgs();
 
@@ -31,10 +41,40 @@ public class DeployApp extends S4ArgsBase {
 
         HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
         ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        // ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
         ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
         Map<String, String> properties = new HashMap<String, String>();
-        properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
+
+        URI s4rURI = null;
+        if (deployArgs.s4rPath != null) {
+            try {
+                s4rURI = new URI(deployArgs.s4rPath);
+            } catch (URISyntaxException e) {
+                logger.error("Cannot get URI from s4r parameter: {}", deployArgs.s4rPath);
+                return;
+            }
+            if (Strings.isNullOrEmpty(s4rURI.getScheme())) {
+                // default is file
+                s4rURI = new File(deployArgs.s4rPath).toURI();
+            }
+            logger.info(
+                    "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+                    s4rURI.toString());
+        } else {
+            throw new RuntimeException("Not specifying s4r URI (as -s4r parameter) is not supported yet");
+        }
+
+        ImmutableList<String> helixModules = ImmutableList.of(HelixBasedCommModule.class.getName(),
+                HelixBasedCoreModule.class.getName());
+        // TODO merge with custom modules
+
+        AppConfig appConfig = new AppConfig.Builder().appClassName(deployArgs.appClass).appName(deployArgs.appName)
+                .appURI(deployArgs.s4rPath).customModulesNames(helixModules).customModulesURIs(null)
+                .namedParameters(Deploy.convertListArgsToMap(deployArgs.extraNamedParameters)).build();
+        // properties.put("appConfig", appConfig.asMap());
+        // properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
         properties.put("type", "App");
+        properties.putAll(appConfig.asMap());
         admin.setConfig(scope, properties);
 
         IdealState is = admin.getResourceIdealState(deployArgs.clusterName, deployArgs.appName);
@@ -78,8 +118,14 @@ public class DeployApp extends S4ArgsBase {
         @Parameter(names = { "-appName" }, description = "Name of the App", required = true, arity = 1)
         String appName;
 
+        @Parameter(names = { "-a", "-appClass" }, description = "Full class name of the application class (extending App or AdapterApp)", required = false)
+        String appClass = "";
+
         @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the App needs to be deployed", required = false, arity = 1)
         String nodeGroup = "default";
 
+        @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+        List<String> extraNamedParameters = new ArrayList<String>();
+
     }
 }