You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/08 05:28:05 UTC

git commit: [S4-110]Working version with Helix using genericEventAdapter.Needs some more clean up and refactoring

Updated Branches:
  refs/heads/S4-110-new 423b7e873 -> 2b718c98b


[S4-110]Working version with Helix using genericEventAdapter.Needs some more clean up and refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2b718c98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2b718c98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2b718c98

Branch: refs/heads/S4-110-new
Commit: 2b718c98bee855ab51b8558d9f88d6e9eb4aaafa
Parents: 423b7e8
Author: Kishore G <ki...@apache.org>
Authored: Thu Feb 7 21:26:30 2013 -0800
Committer: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Committed: Thu Feb 7 21:27:43 2013 -0800

----------------------------------------------------------------------
 .../org/apache/s4/comm/HelixBasedCommModule.java   |   86 +++++++++-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   11 +-
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |    5 +-
 .../apache/s4/comm/topology/ClusterFromHelix.java  |  128 ++++++++++++---
 .../org/apache/s4/comm/util/EmitterMetrics.java    |    2 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    5 +-
 .../org/apache/s4/core/HelixBasedCoreModule.java   |   69 ++++++++-
 .../java/org/apache/s4/core/S4HelixBootstrap.java  |   76 +++++++++-
 .../java/org/apache/s4/deploy/AppStateModel.java   |    2 +-
 .../apache/s4/tools/helix/GenericEventAdapter.java |    1 +
 10 files changed, 340 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index b2e92c7..7084f1e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -1,18 +1,37 @@
 package org.apache.s4.comm;
 
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.s4.comm.helix.TaskStateModelFactory;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
+import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
+import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.ClustersFromHelix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
 public class HelixBasedCommModule extends AbstractModule {
 
-    private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
+    private static Logger logger = LoggerFactory.getLogger(HelixBasedCommModule.class);
+    private InputStream commConfigInputStream;
+    private PropertiesConfiguration config;
+
 
     /**
      * 
@@ -22,19 +41,68 @@ public class HelixBasedCommModule extends AbstractModule {
      *            the name of the cluster to which the current node belongs. If specified in the configuration file,
      *            this parameter will be ignored.
      */
-    public HelixBasedCommModule() {
+    public HelixBasedCommModule(InputStream commConfigInputStream) {
+        this.commConfigInputStream = commConfigInputStream;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (commConfigInputStream != null) {
+            try {
+                commConfigInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+        /* Use Kryo to serialize events. */
+        // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
+        // classloader
+        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+                SerializerDeserializerFactory.class));
 
-        // a node holds a single partition assignment
-        // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
-                TaskStateModelFactory.class);
+        // bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
+        bind(Clusters.class).to(ClustersFromHelix.class).in(Scopes.SINGLETON);
+        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
-        bind(Clusters.class).to(ClustersFromHelix.class);
+        bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
 
+        try {
+            Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
+                    .getString("s4.comm.emitter.class"));
+            bind(Emitter.class).to(emitterClass);
+
+            // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+            Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
+                    .getString("s4.comm.emitter.remote.class"));
+            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+                    RemoteEmitterFactory.class));
+
+        } catch (ClassNotFoundException e) {
+            logger.error("Cannot find class implementation ", e);
+        }
     }
 
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(commConfigInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 8133316..f0546a6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -120,8 +120,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         this.lock = new ReentrantLock();
 
         // Initialize data structures
-        int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
-        destinationChannelMap = HashBiMap.create(clusterSize);
+       // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+        destinationChannelMap = HashBiMap.create();
 
         // Initialize netty related structures
         ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -182,7 +182,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
         if (!destinationChannelMap.containsKey(destination)) {
             if (!connectTo((TCPDestination) destination)) {
-                logger.warn("Could not connect to partition {}, discarding message", destination);
+                logger.warn("Could not connect to destination {}, discarding message", destination);
                 // Couldn't connect, discard message
                 return false;
             }
@@ -308,10 +308,9 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                     // cluster was changed
                 }
             } else {
-                metrics.sentMessage(destination);
-
+                //TODO:
+                //metrics.sentMessage(destination);
             }
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index c7673ae..b389a70 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -77,6 +77,7 @@ public class TCPListener implements Listener {
     public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout, final Receiver receiver,
             final DeserializerExecutorFactory deserializerExecutorFactory) {
         // wait for an assignment
+        logger.info("Initializing tcplistener");
         node = assignment.assignClusterNode();
         nettyTimeout = timeout;
         ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -104,6 +105,8 @@ public class TCPListener implements Listener {
 
         Channel c = bootstrap.bind(new InetSocketAddress(node.getPort()));
         channels.add(c);
+        logger.info("Initialized tcplistener at "+  node);
+
     }
 
     @Override
@@ -155,6 +158,6 @@ public class TCPListener implements Listener {
         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
             super.channelClosed(ctx, e);
         }
-
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 639c95a..2ba5b9a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -38,12 +39,12 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.s4.base.Destination;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
-import com.sun.org.apache.bcel.internal.generic.NEW;
 
 /**
  * Represents a logical cluster definition fetched from Zookeeper. Notifies
@@ -61,7 +62,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
     private final Lock lock;
     private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
     // Map of destination type to streamName to partitionId to Destination
-    private final AtomicReference<Map<String, Map<String, Map<String, Destination>>>> destinationInfoMapRef;
+    private final AtomicReference<Map<String, Map<String, Map<Integer, Destination>>>> destinationInfoMapRef;
 
     /**
      * only the local topology
@@ -77,9 +78,9 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
         partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
         this.clusterRef = new AtomicReference<PhysicalCluster>();
         this.listeners = new ArrayList<ClusterChangeListener>();
-        Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+        Map<String, Map<String, Map<Integer, Destination>>> destinationMap = Collections
                 .emptyMap();
-        destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+        destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<Integer, Destination>>>>(
                 destinationMap);
         lock = new ReentrantLock();
 
@@ -95,9 +96,9 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
         partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
         this.clusterRef = new AtomicReference<PhysicalCluster>();
         this.listeners = new ArrayList<ClusterChangeListener>();
-        Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+        Map<String, Map<String, Map<Integer, Destination>>> destinationMap = Collections
                 .emptyMap();
-        destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+        destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<Integer, Destination>>>>(
                 destinationMap);
         lock = new ReentrantLock();
 
@@ -118,24 +119,78 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
             Builder keyBuilder = helixDataAccessor.keyBuilder();
             List<String> resources = helixDataAccessor.getChildNames(keyBuilder
                     .idealStates());
-            Map<String, Integer> map = new HashMap<String, Integer>();
-            Map<String, Map<String, Map<String, Destination>>> destinationRoutingMap;
-            destinationRoutingMap = new HashMap<String, Map<String,Map<String,Destination>>>();
-            for (String resource : resources) {
-                String resourceType = configAccessor.get(
-                        builder.forCluster(clusterName).forResource(resource)
-                                .build(), "type");
-                if ("Task".equalsIgnoreCase(resourceType)) {
-                    String streamName = configAccessor.get(
-                            builder.forCluster(clusterName)
-                                    .forResource(resource).build(),
+            Map<String, Integer> partitionCountMap = new HashMap<String, Integer>();
+
+            // populate the destinationRoutingMap
+            Map<String, Map<String, Map<Integer, Destination>>> destinationRoutingMap;
+            destinationRoutingMap = new HashMap<String, Map<String, Map<Integer, Destination>>>();
+
+            List<InstanceConfig> configList = helixDataAccessor
+                    .getChildValues(keyBuilder.instanceConfigs());
+            Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+            Map<String, Destination> tcpDestinationMap = new HashMap<String, Destination>();
+
+            Map<String, Destination> udpDestinationMap = new HashMap<String, Destination>();
+
+            for (InstanceConfig config : configList) {
+                instanceConfigMap.put(config.getId(), config);
+                try {
+                    int port = Integer.parseInt(config.getPort());
+                    Destination destination = new TCPDestination(-1, port,
+                            config.getHostName(), config.getId());
+                    tcpDestinationMap.put(config.getId(), destination);
+                    udpDestinationMap.put(config.getId(), destination);
+                } catch (NumberFormatException e) {
+                    logger.error("Invalid port:" + config, e);
+                }
+            }
+            if (externalViewList != null) {
+                for (ExternalView extView : externalViewList) {
+                    String resource = extView.getId();
+                    ConfigScope resourceScope = builder.forCluster(clusterName)
+                            .forResource(resource).build();
+                    String resourceType = configAccessor.get(resourceScope,
+                            "type");
+                    if (!"Task".equalsIgnoreCase(resourceType)) {
+                        continue;
+                    }
+                    String streamName = configAccessor.get(resourceScope,
                             "streamName");
                     IdealState idealstate = helixDataAccessor
                             .getProperty(keyBuilder.idealStates(resource));
-                    map.put(streamName, idealstate.getNumPartitions());
+                    partitionCountMap.put(streamName,
+                            idealstate.getNumPartitions());
+                    for (String partitionName : extView.getPartitionSet()) {
+                        Map<String, String> stateMap = extView
+                                .getStateMap(partitionName);
+                        for (String instanceName : stateMap.keySet()) {
+                            String currentState = stateMap.get(instanceName);
+                            if (!currentState.equals("LEADER")) {
+                                continue;
+                            }
+                            if (instanceConfigMap.containsKey(instanceName)) {
+                                InstanceConfig instanceConfig = instanceConfigMap
+                                        .get(instanceName);
+                                String destinationType = "tcp";
+                                addDestination(destinationRoutingMap,
+                                        streamName, partitionName,
+                                        "tcp", tcpDestinationMap
+                                        .get(instanceConfig.getId()));
+                                addDestination(destinationRoutingMap,
+                                        streamName, partitionName,
+                                        "tcp", udpDestinationMap
+                                        .get(instanceConfig.getId()));
+                            } else {
+                                logger.error("Invalid instance name."
+                                        + instanceName
+                                        + " .Not found in /cluster/configs/. instanceName: ");
+                            }
+                        }
+                    }
                 }
             }
-            partitionCountMapRef.set(map);
+            destinationInfoMapRef.set(destinationRoutingMap);
+            partitionCountMapRef.set(partitionCountMap);
 
             for (ClusterChangeListener listener : listeners) {
                 listener.onChange();
@@ -149,6 +204,36 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
         }
     }
 
+    private void addDestination(
+            Map<String, Map<String, Map<Integer, Destination>>> destinationRoutingMap,
+            String streamName, String partitionName, String destinationType,
+            Destination destination) {
+        if (!destinationRoutingMap
+                .containsKey(destinationType)) {
+            destinationRoutingMap
+                    .put(destinationType,
+                            new HashMap<String, Map<Integer, Destination>>());
+        }
+        Map<String, Map<Integer, Destination>> typeMap = destinationRoutingMap
+                .get(destinationType);
+        if (!typeMap.containsKey(streamName)) {
+            typeMap.put(streamName,
+                    new HashMap<Integer, Destination>());
+        }
+        Map<Integer, Destination> streamMap = typeMap
+                .get(streamName);
+        String[] split = partitionName.split("_");
+        if (split.length == 2) {
+            try {
+                int partitionId = Integer
+                        .parseInt(split[1]);
+                streamMap.put(partitionId, destination);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+    }
+
     @Override
     public PhysicalCluster getPhysicalCluster() {
         return clusterRef.get();
@@ -196,11 +281,12 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
     public Destination getDestination(String streamName, int partitionId,
             String destinationType) {
 
-        Map<String, Map<String, Destination>> typeMap = destinationInfoMapRef.get().get(destinationType);
+        Map<String, Map<Integer, Destination>> typeMap = destinationInfoMapRef
+                .get().get(destinationType);
         if (typeMap == null)
             return null;
 
-        Map<String, Destination> streamMap = typeMap.get(streamName);
+        Map<Integer, Destination> streamMap = typeMap.get(streamName);
         if (streamMap == null)
             return null;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
index 41777cb..c1889ff 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
@@ -22,7 +22,7 @@ public class EmitterMetrics {
     }
 
     public void sentMessage(Destination destination) {
-        //TODO
+        //TODO:
         /*
         Map<String, Meter> map = emittersMetersMap.get(stream);
         if (map == null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 30e44b2..a7407e3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -86,11 +86,8 @@ public class DefaultCoreModule extends AbstractModule {
 
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
-
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(
-                AppStateModelFactory.class);
         
-        bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+        bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
 
         bind(S4RLoaderFactory.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 01f086c..12ba248 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -19,19 +19,39 @@
 package org.apache.s4.core;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.util.S4RLoaderFactory;
+import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.topology.HelixRemoteStreams;
 import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZkRemoteStreams;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
+import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.apache.s4.deploy.HelixBasedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.name.Named;
+import com.google.inject.name.Names;
 
 /**
  * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
@@ -41,18 +61,49 @@ import com.google.inject.name.Named;
 public class HelixBasedCoreModule extends AbstractModule {
 
     private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
+    private InputStream coreConfigFileInputStream;
+    private PropertiesConfiguration config;
 
-    public HelixBasedCoreModule() {
+    public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
+        this.coreConfigFileInputStream = coreConfigFileInputStream;
     }
 
     @Override
     protected void configure() {
 
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (coreConfigFileInputStream != null) {
+            try {
+                coreConfigFileInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+        
         bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
+        bind(S4RLoaderFactory.class);
+
+        // For enabling checkpointing, one needs to use a custom module, such as
+        // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+
+        // shed load in local sender only by default
+        bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
+        bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
+
+        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
+
         bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
         bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
 
+        
     }
 
     @Provides
@@ -65,5 +116,21 @@ public class HelixBasedCoreModule extends AbstractModule {
                 tmpS4Dir.getAbsolutePath());
         return tmpS4Dir;
     }
+    //TODO: move this to a base class
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(coreConfigFileInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 3944971..5e5c40f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -1,7 +1,12 @@
 package org.apache.s4.core;
 
+import java.io.InputStream;
 import java.net.Inet4Address;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -13,17 +18,28 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.HelixBasedCommModule;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.util.ArchiveFetchException;
 import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParametersInjectionModule;
 import org.apache.s4.deploy.AppStateModelFactory;
+import org.apache.s4.deploy.DeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Module;
 import com.google.inject.name.Named;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Modules.OverriddenModuleBuilder;
 
 /**
  * This is the bootstrap for S4 nodes.
@@ -114,5 +130,63 @@ public class S4HelixBootstrap implements Bootstrap {
             e.printStackTrace();
         }
     }
-
+    public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+        try {
+            Injector injector;
+            InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+            InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+
+            logger.info("Initializing S4 app with : {}", appConfig.toString());
+
+            AbstractModule commModule = new HelixBasedCommModule(commConfigFileInputStream);
+            AbstractModule coreModule = new HelixBasedCoreModule(coreConfigFileInputStream);
+
+            List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+            for (String moduleClass : appConfig.getCustomModulesNames()) {
+                extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+            }
+            Module combinedModule = Modules.combine(commModule, coreModule);
+            if (extraModules.size() > 0) {
+                OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+                combinedModule = overridenModuleBuilder.with(extraModules);
+            }
+
+            if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+
+                logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+                Map<String, String> namedParameters = new HashMap<String, String>();
+
+                namedParameters.putAll(appConfig.getNamedParameters());
+                combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+            }
+
+            if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
+                // In that case we won't be using an S4R classloader, app classes are available from the current
+                // classloader
+                // The app module provides bindings specific to the app class loader, in this case the current thread's
+                // class loader.
+                AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
+                // NOTE: because the app module can be overriden
+                combinedModule = Modules.override(appModule).with(combinedModule);
+                injector = parentInjector.createChildInjector(combinedModule);
+                logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+                App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+                app.init();
+                app.start();
+            } else {
+                injector = parentInjector.createChildInjector(combinedModule);
+                if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
+                    logger.info("S4 node in standby until app class or app URI is specified");
+                }
+                Server server = injector.getInstance(Server.class);
+                server.setInjector(injector);
+                DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
+                deploymentManager.deploy(appConfig);
+                // server.start(injector);
+            }
+        } catch (Exception e) {
+            logger.error("Cannot start S4 node", e);
+            System.exit(1);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index b70c27f..427d999 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -99,7 +99,7 @@ public class AppStateModel extends StateModel {
             @Override
             public void run() {
                 // load app class through modules classloader and start it
-                S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+                S4HelixBootstrap.startS4App(appConfig, parentInjector, modulesLoader);
                 // signalOneAppLoaded.countDown();
             }
         }, "S4 platform loader");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index e640066..b16d4e0 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -43,6 +43,7 @@ public class GenericEventAdapter {
                 int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
                 Event event = new Event();
                 event.put("name", String.class, "Hello world to partition:" + partitionId);
+                event.setStreamId("names");
                 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                 KryoSerDeser serializer = new KryoSerDeser(classLoader);
 //                EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));