You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/04 04:40:11 UTC

[2/2] git commit: interim checkin. Cant get the bindings working

Updated Branches:
  refs/heads/S4-110-new [created] 55f8c2157


interim checkin. Cant get the bindings working


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/55f8c215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/55f8c215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/55f8c215

Branch: refs/heads/S4-110-new
Commit: 55f8c2157d267f22bddea56ecf6ed2c3540ea3d0
Parents: 6cd8742
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sun Feb 3 20:38:13 2013 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sun Feb 3 20:38:13 2013 -0800

----------------------------------------------------------------------
 build.gradle                                       |   15 +
 .../src/main/java/org/apache/s4/base/Emitter.java  |    2 +
 .../org/apache/s4/comm/HelixBasedCommModule.java   |  147 +++++
 .../org/apache/s4/comm/helix/S4StateModel.java     |   54 ++
 .../s4/comm/helix/TaskStateModelFactory.java       |   12 +
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    5 +
 .../s4/comm/topology/AssignmentFromHelix.java      |  143 +++++
 .../java/org/apache/s4/comm/topology/Cluster.java  |   11 +
 .../apache/s4/comm/topology/ClusterFromHelix.java  |  190 ++++++
 .../org/apache/s4/comm/topology/ClusterFromZK.java |   10 +
 .../apache/s4/comm/topology/ClustersFromHelix.java |   64 ++
 .../apache/s4/comm/topology/PhysicalCluster.java   |    8 +
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |    5 +
 .../main/java/org/apache/s4/core/BaseModule.java   |   50 ++-
 .../main/java/org/apache/s4/core/Bootstrap.java    |    7 +
 .../java/org/apache/s4/core/DefaultCoreModule.java |    8 +-
 .../org/apache/s4/core/HelixBasedCoreModule.java   |  134 ++++
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |    2 +-
 .../java/org/apache/s4/core/S4HelixBootstrap.java  |  139 +++++
 .../src/main/java/org/apache/s4/core/S4Node.java   |    6 +-
 .../java/org/apache/s4/deploy/AppStateModel.java   |   73 +++
 .../org/apache/s4/deploy/AppStateModelFactory.java |   25 +
 .../org/apache/s4/deploy/DeploymentManager.java    |    9 +-
 .../java/org/apache/s4/deploy/DeploymentUtils.java |   65 ++
 .../s4/deploy/DistributedDeploymentManager.java    |   72 +--
 .../s4/deploy/HelixBasedDeploymentManager.java     |   42 ++
 .../apache/s4/deploy/NoOpDeploymentManager.java    |   14 +
 .../core/moduleloader/ModuleLoaderTestUtils.java   |    2 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |    2 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |    2 +-
 .../src/main/java/org/apache/s4/tools/Tools.java   |   18 +-
 .../java/org/apache/s4/tools/helix/AddNodes.java   |   90 +++
 .../org/apache/s4/tools/helix/CreateCluster.java   |   99 +++
 .../java/org/apache/s4/tools/helix/CreateTask.java |   91 +++
 .../java/org/apache/s4/tools/helix/DeployApp.java  |   85 +++
 .../apache/s4/tools/helix/GenericEventAdapter.java |   72 +++
 .../org/apache/s4/tools/helix/RebalanceTask.java   |   61 ++
 .../java/org/apache/s4/tools/helix/S4Status.java   |  468 +++++++++++++++
 38 files changed, 2231 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b36a770..828ba30 100644
--- a/build.gradle
+++ b/build.gradle
@@ -40,6 +40,7 @@ allprojects {
         maven { url 'http://google-gson.googlecode.com/svn/mavenrepo' }
         maven { url 'https://repo.springsource.org/libs-release' }
         maven { url 'http://repo.gradle.org/gradle/libs-releases-local' }
+        maven { url 'http://maven.restlet.org' }
 
         /* Add lib dir as a repo. Some jar files that are not available
          in a public repo are distributed in the lib dir. */
@@ -71,6 +72,10 @@ project.ext["libraries"] = [
     commons_lang:       'commons-lang:commons-lang:2.6',
     commons_logging:    'commons-logging:commons-logging:1.1.1',
     commons_collections:'commons-collections:commons-collections:3.2.1',
+    commons_cli:        'commons-cli:commons-cli:1.2',
+    commons_math:       'org.apache.commons:commons-math:2.1',
+    restlet_neo:        'com.noelios.restlet:com.noelios.restlet:1.1.10',
+    restlet:            'org.restlet:org.restlet:1.1.10',
     objenesis:          'org.objenesis:objenesis:1.2',
     slf4j:              'org.slf4j:slf4j-api:1.6.1',
     log4j:              'log4j:log4j:1.2.15',
@@ -80,6 +85,9 @@ project.ext["libraries"] = [
     jcip:               'net.jcip:jcip-annotations:1.0',
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
+    helix:              'org.apache.helix:helix-core:0.6.0-incubating',
+    jackson_core_asl:   'org.codehaus.jackson:jackson-core-asl:1.8.5',  
+    jackson_mapper_asl: 'org.codehaus.jackson:jackson-mapper-asl:1.8.5',  
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
     jcommander:         'com.beust:jcommander:1.25',
     asm:                'org.ow2.asm:asm:4.0',
@@ -130,11 +138,18 @@ subprojects {
         compile( libraries.commons_io )
         compile ( libraries.commons_lang )
         runtime(libraries.commons_collections)
+        runtime(libraries.commons_cli)
+        runtime(libraries.commons_math)
 
         /* Misc. */
         compile( libraries.jcip )
         compile( libraries.zk )
         compile( libraries.metrics )
+        compile(libraries.helix)
+        compile(libraries.jackson_core_asl)
+        compile(libraries.jackson_mapper_asl)
+        runtime(libraries.restlet)
+        runtime(libraries.restlet_neo)
 
         /* Testing. */
         testCompile( libraries.junit )

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index 1f74381..e2ee412 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@ -40,6 +40,8 @@ public interface Emitter {
     boolean send(int partitionId, ByteBuffer message) throws InterruptedException;
 
     int getPartitionCount();
+    
+    int getPartitionCount(String stream);
 
     void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
new file mode 100644
index 0000000..980f9bd
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -0,0 +1,147 @@
+package org.apache.s4.comm;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
+import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromHelix;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromHelix;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromHelix;
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+
+public class HelixBasedCommModule extends AbstractModule {
+
+    private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
+    InputStream commConfigInputStream;
+    private PropertiesConfiguration config;
+    String clusterName;
+
+    /**
+     * 
+     * @param commConfigInputStream
+     *            input stream from a configuration file
+     * @param clusterName
+     *            the name of the cluster to which the current node belongs. If specified in the configuration file,
+     *            this parameter will be ignored.
+     */
+    public HelixBasedCommModule(InputStream commConfigInputStream, String clusterName) {
+        super();
+        this.commConfigInputStream = commConfigInputStream;
+        this.clusterName = clusterName;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (commConfigInputStream != null) {
+            try {
+                commConfigInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+        /* Use Kryo to serialize events. */
+        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+                SerializerDeserializerFactory.class));
+
+        // a node holds a single partition assignment
+        // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
+                TaskStateModelFactory.class);
+        
+        bind(Assignment.class).to(AssignmentFromHelix.class);
+        
+        bind(Cluster.class).to(ClusterFromHelix.class);
+
+        bind(Clusters.class).to(ClustersFromHelix.class);
+
+        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+
+        bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
+        
+        try {
+            Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
+                    .getString("s4.comm.emitter.class"));
+            bind(Emitter.class).to(emitterClass);
+
+            // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+            Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
+                    .getString("s4.comm.emitter.remote.class"));
+            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+                    RemoteEmitterFactory.class));
+            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+
+//            bind(Listener.class).to(
+//                    (Class<? extends Listener>) Class.forName(config.getString("s4.comm.listener.class")));
+
+        } catch (ClassNotFoundException e) {
+            logger.error("Cannot find class implementation ", e);
+        }
+
+    }
+
+    @SuppressWarnings("serial")
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(commConfigInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+            if (clusterName != null) {
+                if (config.containsKey("s4.cluster.name")) {
+                    logger.warn(
+                            "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
+                            clusterName, config.getProperty("s4.cluster.name"));
+                } else {
+                    Names.bindProperties(binder, new HashMap<String, String>() {
+                        {
+                            put("s4.cluster.name", clusterName);
+                        }
+                    });
+                }
+            }
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
new file mode 100644
index 0000000..ea35bd6
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
@@ -0,0 +1,54 @@
+package org.apache.s4.comm.helix;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(states = { "LEADER,STANDBY" }, initialState = "OFFLINE")
+public class S4StateModel extends StateModel {
+    private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
+
+    private final String streamName;
+    private final String partitionId;
+
+    public S4StateModel(String partitionName) {
+        String[] parts = partitionName.split("_");
+        this.streamName = parts[0];
+        this.partitionId = parts[1];
+    }
+
+    @Transition(from = "OFFLINE", to = "STANDBY")
+    public void becomeLeaderFromOffline(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "STANDBY", to = "LEADER")
+    public void becomeLeaderFromStandby(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "LEADER", to = "STANDBY")
+    public void becomeStandbyFromLeader(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "STANDBY", to = "OFFLINE")
+    public void becomeOfflineFromStandby(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void dropPartition(Message msg, NotificationContext context) {
+        logger.info("Dropping partition" + msg.getPartitionName());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
new file mode 100644
index 0000000..3b885a8
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
@@ -0,0 +1,12 @@
+package org.apache.s4.comm.helix;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class TaskStateModelFactory extends StateModelFactory<S4StateModel> {
+
+    @Override
+    public S4StateModel createNewStateModel(String partitionName) {
+        return new S4StateModel(partitionName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index fd0ad2b..d31f04c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -277,6 +277,11 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
+    
+    @Override
+    public int getPartitionCount(String streamName) {
+        return topology.getPhysicalCluster().getPartitionCount(streamName);
+    }
 
     class ExceptionHandler extends SimpleChannelUpstreamHandler {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
new file mode 100644
index 0000000..243faac
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -0,0 +1,143 @@
+package org.apache.s4.comm.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+@Singleton
+public class AssignmentFromHelix implements Assignment {
+    private static final Logger logger = LoggerFactory.getLogger(AssignmentFromHelix.class);
+
+    private String clusterName;
+    private final String zookeeperAddress;
+    private String machineId;
+    private HelixManager zkHelixManager;
+
+    private HelixDataAccessor helixDataAccessor;
+    AtomicReference<ClusterNode> clusterNodeRef;
+    private final Lock lock;
+    private final AtomicBoolean currentlyOwningTask;
+    private final Condition taskAcquired;
+
+    private final StateModelFactory<? extends StateModel> taskStateModelFactory;
+
+    // private final StateModelFactory<? extends StateModel> appStateModelFactory;
+
+    @Inject
+    public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+            throws Exception {
+        this.taskStateModelFactory = new TaskStateModelFactory();
+        // this.appStateModelFactory = appStateModelFactory;
+        this.clusterName = clusterName;
+        this.zookeeperAddress = zookeeperAddress;
+        machineId = "localhost";
+        lock = new ReentrantLock();
+        ZkClient zkClient = new ZkClient(zookeeperAddress);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
+        BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+        helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+        clusterNodeRef = new AtomicReference<ClusterNode>();
+        taskAcquired = lock.newCondition();
+        currentlyOwningTask = new AtomicBoolean(true);
+        try {
+            machineId = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            logger.warn("Unable to get hostname", e);
+            machineId = "UNKNOWN";
+        }
+        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
+        clusterNodeRef.set(node);
+        currentlyOwningTask.set(true);
+    }
+
+    @Inject
+    public void init() {
+        // joinCluster();
+    }
+
+    @Override
+    public ClusterNode assignClusterNode() {
+        lock.lock();
+        try {
+            while (!currentlyOwningTask.get()) {
+                taskAcquired.awaitUninterruptibly();
+            }
+        } catch (Exception e) {
+            logger.error("Exception while waiting to join the cluster");
+            return null;
+        } finally {
+            lock.unlock();
+        }
+        return clusterNodeRef.get();
+    }
+
+    public void joinClusterOld() {
+        lock.lock();
+        try {
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            do {
+                List<InstanceConfig> instances = helixDataAccessor.getChildValues(keyBuilder.instanceConfigs());
+                List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+                for (InstanceConfig instanceConfig : instances) {
+                    String instanceName = instanceConfig.getInstanceName();
+                    if (!liveInstances.contains(instanceName)) {
+                        zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+                                InstanceType.PARTICIPANT, zookeeperAddress);
+                        zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
+                                taskStateModelFactory);
+
+                        zkHelixManager.connect();
+                        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
+                                instanceName);
+                        clusterNodeRef.set(node);
+                        currentlyOwningTask.set(true);
+                        taskAcquired.signalAll();
+                        break;
+                    }
+                }
+                if (instances.size() == liveInstances.size()) {
+                    System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
+                    Thread.sleep(100000);
+                }
+            } while (!currentlyOwningTask.get());
+            System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index 2ff6e97..e3c2eef 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -18,6 +18,8 @@
 
 package org.apache.s4.comm.topology;
 
+import org.apache.helix.model.InstanceConfig;
+
 /**
  * Represents a logical cluster
  * 
@@ -29,4 +31,13 @@ public interface Cluster {
     public void addListener(ClusterChangeListener listener);
 
     public void removeListener(ClusterChangeListener listener);
+    /**
+     * TODO: Use a destination class that provides details on node name, port
+     * @param streamName
+     * @param partitionId
+     * @return
+     */
+    InstanceConfig getDestination(String streamName, int partitionId);
+
+    Integer getPartitionCount(String streamName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
new file mode 100644
index 0000000..19e0628
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.topology;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+/**
+ * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
+ * configuration.
+ * 
+ */
+public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
+
+    private static Logger logger = LoggerFactory.getLogger(ClusterFromHelix.class);
+
+    private final String clusterName;
+    private final AtomicReference<PhysicalCluster> clusterRef;
+    private final List<ClusterChangeListener> listeners;
+    private final Lock lock;
+    private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+
+    /**
+     * only the local topology
+     */
+    @Inject
+    public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        this.clusterName = clusterName;
+        Map<String, Integer> map = Collections.emptyMap();
+        partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        lock = new ReentrantLock();
+
+    }
+
+    /**
+     * any topology
+     */
+    public ClusterFromHelix(String clusterName, ZkClient zkClient, String machineId) {
+        this.clusterName = clusterName;
+        Map<String, Integer> map = Collections.emptyMap();
+        partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        lock = new ReentrantLock();
+
+    }
+
+    @Override
+    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+        lock.lock();
+        try {
+            logger.info("Start:Processing change in cluster topology");
+            super.onExternalViewChange(externalViewList, changeContext);
+            HelixManager manager = changeContext.getManager();
+            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            ConfigAccessor configAccessor = manager.getConfigAccessor();
+            ConfigScopeBuilder builder = new ConfigScopeBuilder();
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            List<String> resources = helixDataAccessor.getChildNames(keyBuilder.idealStates());
+            Map<String, Integer> map = new HashMap<String, Integer>();
+            for (String resource : resources) {
+                String resourceType = configAccessor.get(builder.forCluster(clusterName).forResource(resource).build(),
+                        "type");
+                if ("Task".equalsIgnoreCase(resourceType)) {
+                    String streamName = configAccessor.get(builder.forCluster(clusterName).forResource(resource)
+                            .build(), "streamName");
+                    IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+                    map.put(streamName, idealstate.getNumPartitions());
+                }
+            }
+            partitionCountMapRef.set(map);
+            for (ClusterChangeListener listener : listeners) {
+                listener.onChange();
+            }
+            logger.info("End:Processing change in cluster topology");
+
+        } catch (Exception e) {
+            logger.error("", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public PhysicalCluster getPhysicalCluster() {
+        return clusterRef.get();
+    }
+
+    @Override
+    public void addListener(ClusterChangeListener listener) {
+        logger.info("Adding topology change listener:" + listener);
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(ClusterChangeListener listener) {
+        logger.info("Removing topology change listener:" + listener);
+        listeners.remove(listener);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClusterFromHelix other = (ClusterFromHelix) obj;
+        if (clusterName == null) {
+            if (other.clusterName != null)
+                return false;
+        } else if (!clusterName.equals(other.clusterName))
+            return false;
+        return true;
+    }
+
+    @Override
+    public InstanceConfig getDestination(String streamName, int partitionId) {
+        List<InstanceConfig> instances = getInstances(streamName, streamName + "_" + partitionId, "LEADER");
+        if (instances.size() == 1) {
+            return instances.get(0);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Integer getPartitionCount(String streamName) {
+        Integer numPartitions = partitionCountMapRef.get().get(streamName);
+        if (numPartitions == null) {
+            return -1;
+        }
+        return numPartitions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 45e0ae4..f2b60d5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -208,5 +209,14 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
         doProcess();
 
     }
+    @Override
+    public InstanceConfig getDestination(String streamName, int partitionId) {
+        return null;
+    }
+
+    @Override
+    public Integer getPartitionCount(String streamName) {
+        return null;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
new file mode 100644
index 0000000..4e5351b
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Monitors all clusters
+ * 
+ */
+public class ClustersFromHelix implements Clusters {
+    private static final Logger logger = LoggerFactory.getLogger(ClustersFromHelix.class);
+    private String machineId;
+    private Map<String, ClusterFromHelix> clusters = new HashMap<String, ClusterFromHelix>();
+    private int connectionTimeout;
+    private String clusterName;
+
+    @Inject
+    public ClustersFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        this.clusterName = clusterName;
+        this.connectionTimeout = connectionTimeout;
+
+    }
+
+    public Cluster getCluster(String clusterName) {
+        return clusters.get(clusterName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 7c590ac..233316d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -97,6 +97,14 @@ public class PhysicalCluster {
     public int getPartitionCount() {
         return numPartitions;
     }
+    
+    /**
+     * 
+     * @return Number of partitions in the cluster per stream
+     */
+    public int getPartitionCount(String streamName) {
+        return numPartitions;
+    }
 
     /**
      * @param node

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index d335970..b6450b5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -109,6 +109,11 @@ public class UDPEmitter implements Emitter, ClusterChangeListener {
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
+    
+    @Override
+    public int getPartitionCount(String streamName) {
+        return topology.getPhysicalCluster().getPartitionCount(streamName);
+    }
 
     @Override
     public void onChange() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 582f5d0..b9679e7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -7,10 +7,15 @@ import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromHelix;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromHelix;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.HelixBasedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,11 +31,15 @@ public class BaseModule extends AbstractModule {
     private PropertiesConfiguration config;
     InputStream baseConfigInputStream;
     String clusterName;
+    private String instanceName;
+    boolean useHelix = true;
 
-    public BaseModule(InputStream baseConfigInputStream, String clusterName) {
+    public BaseModule(InputStream baseConfigInputStream, String clusterName,
+            String instanceName) {
         super();
         this.baseConfigInputStream = baseConfigInputStream;
         this.clusterName = clusterName;
+        this.instanceName = instanceName;
     }
 
     @Override
@@ -38,17 +47,33 @@ public class BaseModule extends AbstractModule {
         if (config == null) {
             loadProperties(binder());
         }
+        if (useHelix) {
+            bind(Assignment.class).to(AssignmentFromHelix.class)
+                    .asEagerSingleton();
+            bind(Cluster.class).to(ClusterFromHelix.class);
+            bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+
+            bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+            bind(Bootstrap.class).to(S4HelixBootstrap.class);
+
+            // share the Zookeeper connection
+            bind(ZkClient.class).toProvider(ZkClientProvider.class).in(
+                    Scopes.SINGLETON);
+            return;
+        }
         // a node holds a single partition assignment
-        // ==> Assignment is a singleton so it shared between base, comm and app layers.
+        // ==> Assignment is a singleton so it shared between base, comm and app
+        // layers.
         // it is eager so that the node is able to join a cluster immediately
         bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
         // bind(Cluster.class).to(ClusterFromZK.class);
 
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
-        bind(S4Bootstrap.class);
+        bind(Bootstrap.class).to(S4Bootstrap.class);
 
         // share the Zookeeper connection
-        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(
+                Scopes.SINGLETON);
 
     }
 
@@ -61,7 +86,8 @@ public class BaseModule extends AbstractModule {
             // TODO - validate properties.
 
             /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+            Names.bindProperties(binder,
+                    ConfigurationConverter.getProperties(config));
 
             if (clusterName != null) {
                 if (config.containsKey("s4.cluster.name")) {
@@ -76,6 +102,20 @@ public class BaseModule extends AbstractModule {
                     });
                 }
             }
+            if (instanceName != null) {
+                if (config.containsKey("s4.instance.name")) {
+                    logger.warn(
+                            "instanceName [{}] passed as a parameter will not be used because an existing s4.instance.name parameter of value [{}] was found in the configuration file and will be used",
+                            instanceName,
+                            config.getProperty("s4.instance.name"));
+                } else {
+                    Names.bindProperties(binder, new HashMap<String, String>() {
+                        {
+                            put("s4.instance.name", instanceName);
+                        }
+                    });
+                }
+            }
 
         } catch (ConfigurationException e) {
             binder.addError(e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Bootstrap.java
new file mode 100644
index 0000000..3bc818b
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Bootstrap.java
@@ -0,0 +1,7 @@
+package org.apache.s4.core;
+
+import com.google.inject.Injector;
+
+public interface Bootstrap {
+	void start(Injector parentInjector) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index c4cafee..30e44b2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
@@ -38,8 +39,10 @@ import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
+import org.apache.s4.deploy.AppStateModelFactory;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.deploy.HelixBasedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +87,10 @@ public class DefaultCoreModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(
+                AppStateModelFactory.class);
+        
+        bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
         bind(S4RLoaderFactory.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
new file mode 100644
index 0000000..0ce80ed
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.util.S4RLoaderFactory;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZkRemoteStreams;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
+import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.deploy.HelixBasedDeploymentManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.name.Named;
+import com.google.inject.name.Names;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ * 
+ */
+public class HelixBasedCoreModule extends AbstractModule {
+
+    private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
+
+    InputStream coreConfigFileInputStream;
+    private PropertiesConfiguration config;
+
+    String clusterName = null;
+
+    public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
+        this.coreConfigFileInputStream = coreConfigFileInputStream;
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (coreConfigFileInputStream != null) {
+            try {
+                coreConfigFileInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+
+        bind(S4RLoaderFactory.class);
+
+        // For enabling checkpointing, one needs to use a custom module, such as
+        // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+
+        // shed load in local sender only by default
+        bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
+        bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
+
+        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
+
+        bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
+        bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
+    }
+
+    @Provides
+    @Named("s4.tmp.dir")
+    public File provideTmpDir() {
+        File tmpS4Dir = Files.createTempDir();
+        tmpS4Dir.deleteOnExit();
+        logger.warn(
+                "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+                tmpS4Dir.getAbsolutePath());
+        return tmpS4Dir;
+    }
+
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(coreConfigFileInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 77dbc67..61f6b7f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -58,7 +58,7 @@ import com.google.inject.util.Modules.OverriddenModuleBuilder;
  * 
  * 
  */
-public class S4Bootstrap {
+public class S4Bootstrap implements Bootstrap{
     private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
 
     private final ZkClient zkClient;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
new file mode 100644
index 0000000..d20c962
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -0,0 +1,139 @@
+package org.apache.s4.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.base.util.ModulesLoader;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.ModulesLoaderFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParametersInjectionModule;
+import org.apache.s4.deploy.AppStateModelFactory;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Named;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Modules.OverriddenModuleBuilder;
+
+/**
+ * This is the bootstrap for S4 nodes.
+ * <p>
+ * Its roles are to:
+ * <ul>
+ * <li>register within the S4 cluster (and acquire a partition).
+ * <li>look for application deployed on the S4 cluster
+ * </ul>
+ * <p>
+ * When an application is available, custom modules are fetched if necessary and
+ * a full-featured S4 node is started. The application code is then downloaded
+ * and the app started.
+ * <p>
+ * For testing purposes, it is also possible to directly start an application
+ * without fetching remote code, provided the application classes are available
+ * in the classpath.
+ * 
+ * 
+ * 
+ */
+public class S4HelixBootstrap implements Bootstrap{
+	private static Logger logger = LoggerFactory
+			.getLogger(S4HelixBootstrap.class);
+
+	private final AtomicBoolean deployed = new AtomicBoolean(false);
+
+	private final ArchiveFetcher fetcher;
+
+	private Injector parentInjector;
+
+	CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+
+	private String clusterName;
+
+	private String instanceName;
+
+	private String zookeeperAddress;
+    @Inject
+    private TaskStateModelFactory taskStateModelFactory;
+    
+    @Inject
+    private AppStateModelFactory appStateModelFactory;
+    
+    @Inject
+    private Cluster cluster;
+
+	@Inject
+	public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.instance.name") String instanceName,
+			@Named("s4.cluster.zk_address") String zookeeperAddress,
+			@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+			@Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
+			ArchiveFetcher fetcher) {
+        this.clusterName = clusterName;
+        this.instanceName = instanceName;
+        this.zookeeperAddress = zookeeperAddress;
+		this.fetcher = fetcher;
+        registerWithHelix();
+	}
+
+	public void start(Injector parentInjector) throws InterruptedException,
+			ArchiveFetchException {
+		this.parentInjector = parentInjector;
+		if (!deployed.get()) {
+
+		}
+		signalOneAppLoaded.await();
+	}
+	
+	private void registerWithHelix()
+    {
+      HelixManager helixManager;
+      try
+      {
+        helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
+            instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
+        helixManager.getStateMachineEngine().registerStateModelFactory(
+          "LeaderStandby", taskStateModelFactory);
+        helixManager.getStateMachineEngine().registerStateModelFactory(
+          "OnlineOffline", appStateModelFactory);
+        helixManager.connect();  
+        helixManager.addExternalViewChangeListener((RoutingTableProvider)cluster);
+      } catch (Exception e)
+      {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index 907fcb9..db84c35 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -50,7 +50,7 @@ public class S4Node {
         });
 
         Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
-                "default.s4.base.properties").openStream(), nodeArgs.clusterName) });
+                "default.s4.base.properties").openStream(), nodeArgs.clusterName, nodeArgs.instanceName) });
         S4Bootstrap bootstrap = injector.getInstance(S4Bootstrap.class);
         try {
             bootstrap.start(injector);
@@ -74,6 +74,8 @@ public class S4Node {
 
         @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
         String zkConnectionString;
-
+        
+        @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
+        String instanceName = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
new file mode 100644
index 0000000..5f21f51
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -0,0 +1,73 @@
+package org.apache.s4.deploy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.s4.core.util.AppConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@StateModelInfo(states = { "ONLINE,OFFLINE" }, initialState = "OFFLINE")
+public class AppStateModel extends StateModel {
+	private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
+	private final String appName;
+	private DeploymentManager deploymentManager;
+
+	public AppStateModel(DeploymentManager deploymentManager, String appName) {
+		this.deploymentManager = deploymentManager;
+		this.appName = appName;
+	}
+
+	@Transition(from = "OFFLINE", to = "ONLINE")
+	public void deploy(Message message, NotificationContext context)
+			throws Exception {
+		logger.info("Deploying app:"+ appName);
+		HelixManager manager = context.getManager();
+		ConfigAccessor configAccessor = manager.getConfigAccessor();
+		AppConfig config = createAppConfig(manager, configAccessor);
+		deploymentManager.deploy(config);
+		logger.info("Deployed app:"+ appName);
+
+	}
+
+	private AppConfig createAppConfig(HelixManager manager,
+			ConfigAccessor configAccessor) {
+		ConfigScopeBuilder builder = new ConfigScopeBuilder();
+		ConfigScope scope = builder.forCluster(manager.getClusterName())
+				.forResource(appName).build();
+		String appURI = configAccessor.get(scope,
+				DeploymentManager.S4R_URI);
+		String clusterName = manager.getClusterName();
+		String appClassName = null;
+		List<String> customModulesNames = new ArrayList<String>();
+		List<String> customModulesURIs = new ArrayList<String>();
+		Map<String, String> namedParameters = new HashMap<String, String>();
+		AppConfig config = new AppConfig(clusterName, appClassName, appURI,
+				customModulesNames, customModulesURIs, namedParameters);
+		return config;
+	}
+
+	@Transition(from = "OFFLINE", to = "ONLINE")
+	public void undeploy(Message message, NotificationContext context)
+			throws Exception {
+		logger.info("Undeploying app:"+ appName);
+		HelixManager manager = context.getManager();
+		ConfigAccessor configAccessor = manager.getConfigAccessor();
+		AppConfig config = createAppConfig(manager, configAccessor);
+		deploymentManager.undeploy(config);
+		logger.info("Undeploying app:"+ appName);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
new file mode 100644
index 0000000..4f6dc00
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
@@ -0,0 +1,25 @@
+package org.apache.s4.deploy;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.Server;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class AppStateModelFactory extends StateModelFactory<AppStateModel> {
+    private final DeploymentManager deploymentManager;
+
+    @Inject
+    public AppStateModelFactory(DeploymentManager deploymentManager, ArchiveFetcher fetcher) {
+        this.deploymentManager = deploymentManager;
+
+    }
+
+    @Override
+    public AppStateModel createNewStateModel(String partitionName) {
+        return new AppStateModel(deploymentManager,partitionName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
index e92e97d..1b8994e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentManager.java
@@ -18,13 +18,20 @@
 
 package org.apache.s4.deploy;
 
+import org.apache.s4.core.util.AppConfig;
+
 /**
  * Marker interface for deployment managers. Allows to supply a no-op deployment manager through dependency injection.
  * (TODO that hack should be improved!)
  * 
  */
 public interface DeploymentManager {
-
+    public static final String S4R_URI = "s4r_uri";
+    
     void start();
+    
+    void deploy(AppConfig appConfig) throws DeploymentFailedException;
+    
+    void undeploy(AppConfig appConfig) throws DeploymentFailedException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
index cc404af..fc42754 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
@@ -1,14 +1,25 @@
 package org.apache.s4.deploy;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Server;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
 public class DeploymentUtils {
 
     private static Logger logger = LoggerFactory.getLogger(DeploymentUtils.class);
@@ -34,5 +45,59 @@ public class DeploymentUtils {
         }
         zk.close();
     }
+	public static void deploy(Server server, ArchiveFetcher fetcher, String clusterName, AppConfig appConfig) throws DeploymentFailedException {
+        if (appConfig.getAppURI() == null) {
+            if (appConfig.getAppClassName() != null) {
+                try {
+                    App app = (App) DeploymentUtils.class.getClassLoader().loadClass(appConfig.getAppClassName()).newInstance();
+                    server.startApp(app, "appName", clusterName);
+                } catch (Exception e) {
+                    logger.error("Cannot start application: cannot instantiate app class {} due to: {}",
+                            appConfig.getAppClassName(), e.getMessage());
+                    return;
+                }
+            }
+            logger.info("{} value not set for {} : no application code will be downloaded", DeploymentManager.S4R_URI, appConfig.getAppName());
+            return;
+        }
+        try {
+            URI uri = new URI(appConfig.getAppURI());
 
+            // fetch application
+            File localS4RFileCopy;
+            try {
+                localS4RFileCopy = File.createTempFile("tmp", "s4r");
+            } catch (IOException e1) {
+                logger.error(
+                        "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+                        appConfig.getAppName(), e1.getClass().getName() + "->" + e1.getMessage());
+                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e1);
+            }
+            localS4RFileCopy.deleteOnExit();
+            try {
+                if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+                    throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
+                            + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
+                }
+            } catch (Exception e) {
+                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+                        + "] from URI [" + uri.toString() + "] ", e);
+            }
+            // install locally
+            App loaded = server.loadApp(localS4RFileCopy, appConfig.getAppName());
+            if (loaded != null) {
+                logger.info("Successfully installed application {}", appConfig.getAppName());
+                // TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
+                server.startApp(loaded, appConfig.getAppName(), clusterName);
+            } else {
+                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+                        + "] from URI [" + uri.toString() + "] : cannot start application");
+            }
+
+        } catch (URISyntaxException e) {
+            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
+                    appConfig.getAppName(), appConfig.getAppURI(), e.getMessage() });
+            throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e);
+        }
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index fc05c48..60d66bc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -68,7 +68,6 @@ import com.google.inject.name.Named;
  */
 public class DistributedDeploymentManager implements DeploymentManager {
 
-    public static final String S4R_URI = "s4r_uri";
 
     private static Logger logger = LoggerFactory.getLogger(DistributedDeploymentManager.class);
 
@@ -103,63 +102,14 @@ public class DistributedDeploymentManager implements DeploymentManager {
 
     public void deployApplication() throws DeploymentFailedException {
         ZNRecord appData = zkClient.readData(appPath);
-        AppConfig appConfig = new AppConfig(appData);
-        if (appConfig.getAppURI() == null) {
-            if (appConfig.getAppClassName() != null) {
-                try {
-                    App app = (App) getClass().getClassLoader().loadClass(appConfig.getAppClassName()).newInstance();
-                    server.startApp(app, "appName", clusterName);
-                } catch (Exception e) {
-                    logger.error("Cannot start application: cannot instantiate app class {} due to: {}",
-                            appConfig.getAppClassName(), e.getMessage());
-                    return;
-                }
-            }
-            logger.info("{} value not set for {} : no application code will be downloaded", S4R_URI, appPath);
-            return;
-        }
-        try {
-            URI uri = new URI(appConfig.getAppURI());
-
-            // fetch application
-            File localS4RFileCopy;
-            try {
-                localS4RFileCopy = File.createTempFile("tmp", "s4r");
-            } catch (IOException e1) {
-                logger.error(
-                        "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
-                        appConfig.getAppName(), e1.getClass().getName() + "->" + e1.getMessage());
-                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e1);
-            }
-            localS4RFileCopy.deleteOnExit();
-            try {
-                if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
-                    throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
-                            + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
-                }
-            } catch (Exception e) {
-                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
-                        + "] from URI [" + uri.toString() + "] ", e);
-            }
-            // install locally
-            App loaded = server.loadApp(localS4RFileCopy, appConfig.getAppName());
-            if (loaded != null) {
-                logger.info("Successfully installed application {}", appConfig.getAppName());
-                // TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
-                server.startApp(loaded, appConfig.getAppName(), clusterName);
-            } else {
-                throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
-                        + "] from URI [" + uri.toString() + "] : cannot start application");
-            }
-
-        } catch (URISyntaxException e) {
-            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
-                    appConfig.getAppName(), appConfig.getAppURI(), e.getMessage() });
-            throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e);
-        }
+		AppConfig appConfig = new AppConfig(appData);
+        deploy(appConfig);
+        DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
         deployed = true;
     }
 
+
+
     // NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
     // but that's probably not that useful, and we can simply provide whichever protocol is needed
 
@@ -191,4 +141,16 @@ public class DistributedDeploymentManager implements DeploymentManager {
             }
         }
     }
+
+	@Override
+	public void deploy(AppConfig appConfig) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void undeploy(AppConfig appConfig) {
+		// TODO Auto-generated method stub
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
new file mode 100644
index 0000000..caec699
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -0,0 +1,42 @@
+package org.apache.s4.deploy;
+
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.Server;
+import org.apache.s4.core.util.AppConfig;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class HelixBasedDeploymentManager implements DeploymentManager {
+    private final Server server;
+    boolean deployed = false;
+    private final String clusterName;
+	private ArchiveFetcher fetcher;
+
+    @Inject
+    public HelixBasedDeploymentManager(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, ArchiveFetcher fetcher) {
+        this.clusterName = clusterName;
+        this.server = server;
+		this.fetcher = fetcher;
+
+    }
+
+    @Override
+    public void start() {
+        
+    }
+
+	@Override
+	public void deploy(AppConfig appConfig) throws DeploymentFailedException {
+		DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+	}
+
+	@Override
+	public void undeploy(AppConfig appConfig) {
+		
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
index 27777d8..4aea928 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/NoOpDeploymentManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.s4.deploy;
 
+import org.apache.s4.core.util.AppConfig;
+
 /**
  * Does not handle any deployment (hence does not require any cluster configuration settings)
  * 
@@ -28,4 +30,16 @@ public class NoOpDeploymentManager implements DeploymentManager {
     public void start() {
         // does nothing
     }
+
+	@Override
+	public void deploy(AppConfig appConfig) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	@Override
+	public void undeploy(AppConfig appConfig) {
+		// TODO Auto-generated method stub
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index e5d0d01..1ca862c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -68,7 +68,7 @@ public class ModuleLoaderTestUtils {
         }
 
         Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
-                .openStream(), "cluster1"), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                .openStream(), "cluster1",null), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
                 .openStream()));
 
         Emitter emitter = injector.getInstance(TCPEmitter.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 9b24f3c..965a621 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -116,7 +116,7 @@ public class CoreTestUtils extends CommTestUtils {
 
     public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
         return Guice.createInjector(Modules.override(
-                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1"),
+                new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
                 new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
                 new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
                 new NonFailFastZookeeperClientsModule()));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 5b4f596..c25d756 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -59,7 +59,7 @@ public class WordCountTest extends ZkBasedTest {
 
     public void createEmitter() throws IOException {
         injector = Guice.createInjector(new BaseModule(
-                Resources.getResource("default.s4.base.properties").openStream(), "cluster1"), new DefaultCommModule(
+                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null), new DefaultCommModule(
                 Resources.getResource("default.s4.comm.properties").openStream()), new DefaultCoreModule(Resources
                 .getResource("default.s4.core.properties").openStream()));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 832cfcc..d25f7a7 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -26,6 +26,13 @@ import java.util.List;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Level;
 import org.apache.s4.core.S4Node;
+import org.apache.s4.tools.helix.AddNodes;
+import org.apache.s4.tools.helix.CreateCluster;
+import org.apache.s4.tools.helix.CreateTask;
+import org.apache.s4.tools.helix.DeployApp;
+import org.apache.s4.tools.helix.GenericEventAdapter;
+import org.apache.s4.tools.helix.RebalanceTask;
+import org.apache.s4.tools.helix.S4Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,9 +46,14 @@ public class Tools {
     static Logger logger = LoggerFactory.getLogger(Tools.class);
 
     enum Task {
-        deploy(Deploy.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(
-                null), newApp(CreateApp.class), s4r(Package.class), status(Status.class);
-
+        //deploy(Deploy.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(
+          //      null), newApp(CreateApp.class), s4r(Package.class), status(Status.class);
+
+        deployApp(DeployApp.class), node(S4Node.class), zkServer(ZKServer.class), newCluster(CreateCluster.class), genericAdapter(
+                GenericEventAdapter.class), newApp(CreateApp.class), s4r(Package.class), status(S4Status.class),
+                addNodes(AddNodes.class),createTask(
+                        CreateTask.class), rebalanceTask(RebalanceTask.class);
+        
         Class<?> target;
 
         Task(Class<?> target) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/55f8c215/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
new file mode 100644
index 0000000..136a4aa
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/AddNodes.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.helix;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.tools.S4ArgsBase;
+import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class AddNodes {
+
+    static Logger logger = LoggerFactory.getLogger(AddNodes.class);
+
+    public static void main(String[] args) {
+
+        ZKServerArgs clusterArgs = new ZKServerArgs();
+        Tools.parseArgs(clusterArgs, args);
+        try {
+
+            logger.info("Adding new nodes [{}] to cluster [{}] node(s)", clusterArgs.nbNodes, clusterArgs.clusterName);
+            HelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
+            int initialPort = clusterArgs.firstListeningPort;
+            if (clusterArgs.nbNodes > 0) {
+                String[] split = clusterArgs.nodes.split(",");
+                for (int i = 0; i < clusterArgs.nbNodes; i++) {
+                    InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+                    String host = "localhost";
+                    if (split.length > 0 && split.length == clusterArgs.nbNodes) {
+                        host = split[i].trim();
+                    }
+                    instanceConfig.setHostName(host);
+                    instanceConfig.setPort("" + initialPort);
+                    instanceConfig.getRecord().setSimpleField("GROUP", clusterArgs.nodeGroup);
+                    helixAdmin.addInstance(clusterArgs.clusterName, instanceConfig);
+                    initialPort = initialPort + 1;
+                }
+            }
+            logger.info("New nodes configuration uploaded into zookeeper");
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+
+    }
+
+    @Parameters(commandNames = "s4 addNodes", separators = "=", commandDescription = "Setup new S4 logical cluster")
+    static class ZKServerArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
+        String clusterName = "s4-test-cluster";
+
+        @Parameter(names = "-nbNodes", description = "number of S4 nodes for the cluster", required = true)
+        int nbNodes = 1;
+
+        @Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
+        String nodes = "";
+
+        @Parameter(names = "-zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+        int firstListeningPort = -1;
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
+        String nodeGroup = "default";
+    }
+
+}