You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/08 16:45:37 UTC

git commit: fixes so that all zk tests pass and helix generic injection runs correctly - when a cluster node dies and another node with same destination appears, the connection must be reinitiated. This is detected when sending a new message. This messag

Updated Branches:
  refs/heads/S4-110-new 75981b873 -> 0d2f441a0


fixes so that all zk tests pass and helix generic injection runs correctly
- when a cluster node dies and another node with same destination appears, the connection
must be reinitiated. This is detected when sending a new message. This message is lost
but the channel is closed and destination removed, so a new connection can be initiated
- in recovery tests, this implies sending another message.
- also factored helix custom modules and bootstrap


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0d2f441a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0d2f441a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0d2f441a

Branch: refs/heads/S4-110-new
Commit: 0d2f441a0963bacaf633eb3353678aac730ecd6a
Parents: 75981b8
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Feb 8 17:39:26 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Feb 8 17:39:26 2013 +0100

----------------------------------------------------------------------
 .../src/main/resources/default.s4.base.properties  |    5 +-
 .../org/apache/s4/comm/HelixBasedCommModule.java   |   86 ++-------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   16 ++-
 .../main/java/org/apache/s4/core/BaseModule.java   |    5 +-
 .../org/apache/s4/core/HelixBasedCoreModule.java   |   69 +------------
 .../java/org/apache/s4/core/S4HelixBootstrap.java  |   76 +-------------
 .../java/org/apache/s4/deploy/AppStateModel.java   |    2 +-
 .../s4/deploy/DistributedDeploymentManager.java    |   38 ++-----
 .../org/apache/s4/core/ft/FTWordCountTest.java     |    9 ++
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |    7 +
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |    3 +-
 11 files changed, 58 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-base/src/main/resources/default.s4.base.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/resources/default.s4.base.properties b/subprojects/s4-base/src/main/resources/default.s4.base.properties
index 681232e..c5a56d2 100644
--- a/subprojects/s4-base/src/main/resources/default.s4.base.properties
+++ b/subprojects/s4-base/src/main/resources/default.s4.base.properties
@@ -2,4 +2,7 @@
 s4.cluster.zk_address = localhost:2181
 s4.cluster.zk_session_timeout = 10000
 s4.cluster.zk_connection_timeout = 10000
-s4.logger_level = DEBUG
\ No newline at end of file
+s4.logger_level = DEBUG
+
+# use helix for cluster management, application lifecycle, and partitioning
+s4.helix = false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index 7084f1e..b2e92c7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -1,37 +1,18 @@
 package org.apache.s4.comm;
 
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
-import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
-import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.ClustersFromHelix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Scopes;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
 public class HelixBasedCommModule extends AbstractModule {
 
-    private static Logger logger = LoggerFactory.getLogger(HelixBasedCommModule.class);
-    private InputStream commConfigInputStream;
-    private PropertiesConfiguration config;
-
+    private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
 
     /**
      * 
@@ -41,68 +22,19 @@ public class HelixBasedCommModule extends AbstractModule {
      *            the name of the cluster to which the current node belongs. If specified in the configuration file,
      *            this parameter will be ignored.
      */
-    public HelixBasedCommModule(InputStream commConfigInputStream) {
-        this.commConfigInputStream = commConfigInputStream;
+    public HelixBasedCommModule() {
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (commConfigInputStream != null) {
-            try {
-                commConfigInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-        /* Use Kryo to serialize events. */
-        // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
-        // classloader
-        install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
-                SerializerDeserializerFactory.class));
 
-        // bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
-        bind(Clusters.class).to(ClustersFromHelix.class).in(Scopes.SINGLETON);
-        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+        // a node holds a single partition assignment
+        // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
+                TaskStateModelFactory.class);
 
-        bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
+        bind(Clusters.class).to(ClustersFromHelix.class);
 
-        try {
-            Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
-                    .getString("s4.comm.emitter.class"));
-            bind(Emitter.class).to(emitterClass);
-
-            // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
-            Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
-                    .getString("s4.comm.emitter.remote.class"));
-            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
-                    RemoteEmitterFactory.class));
-
-        } catch (ClassNotFoundException e) {
-            logger.error("Cannot find class implementation ", e);
-        }
     }
 
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(commConfigInputStream);
-
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index cca28b2..89a2e42 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -75,7 +75,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     private final int nettyTimeout;
 
-    private Cluster topology;
+    private final Cluster topology;
     private final ClientBootstrap bootstrap;
 
     /*
@@ -120,7 +120,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         this.lock = new ReentrantLock();
 
         // Initialize data structures
-       // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+        // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
         destinationChannelMap = HashBiMap.create();
 
         // Initialize netty related structures
@@ -128,6 +128,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                 Executors.newCachedThreadPool());
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
             @Override
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
@@ -188,14 +189,14 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             }
         }
 
-        writePermits.get(destination).acquire();
-
         Channel c = destinationChannelMap.get(destination);
         if (c == null) {
             logger.warn("Could not find channel for destination {}", destination);
             return false;
         }
 
+        writePermits.get(destination).acquire();
+
         c.write(buffer).addListener(new MessageSendingListener(destination));
         return true;
     }
@@ -303,12 +304,17 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                     // TODO handle possible cluster reconfiguration between send and failure callback
                     logger.warn("Failed to send message to node {} (according to current cluster information)",
                             destination);
+                    future.getChannel().close();
+                    destinationChannelMap.inverse().remove(future.getChannel());
                 } catch (IndexOutOfBoundsException ignored) {
                     logger.error("Failed to send message to partition {}", destination);
                     // cluster was changed
                 }
             } else {
-                metrics.sentMessage(destination);
+                // TODO fix metrics initialization for helix
+                if (metrics != null) {
+                    metrics.sentMessage(destination);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index ccb478f..7f0a539 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -47,9 +47,10 @@ public class BaseModule extends AbstractModule {
         if (config == null) {
             loadProperties(binder());
         }
+        // share the Zookeeper connection
         bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
-        if (config.containsKey("s4.helix") && config.getBoolean("s4.helix")) {
+        if (config.getBoolean("s4.helix")) {
             bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
             bind(Cluster.class).to(ClusterFromHelix.class);
             bind(TaskStateModelFactory.class);
@@ -58,8 +59,6 @@ public class BaseModule extends AbstractModule {
 
             bind(Bootstrap.class).to(S4HelixBootstrap.class);
 
-            // share the Zookeeper connection
-            return;
         } else {
             // a node holds a single partition assignment
             // ==> Assignment is a singleton so it shared between base, comm and app

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 12ba248..01f086c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -19,39 +19,19 @@
 package org.apache.s4.core;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
 
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.topology.HelixRemoteStreams;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.ZkRemoteStreams;
-import org.apache.s4.core.ft.CheckpointingFramework;
-import org.apache.s4.core.ft.NoOpCheckpointingFramework;
-import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.SenderExecutorServiceFactory;
-import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.apache.s4.deploy.HelixBasedDeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.io.Files;
 import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.name.Named;
-import com.google.inject.name.Names;
 
 /**
  * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
@@ -61,49 +41,18 @@ import com.google.inject.name.Names;
 public class HelixBasedCoreModule extends AbstractModule {
 
     private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
-    private InputStream coreConfigFileInputStream;
-    private PropertiesConfiguration config;
 
-    public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
-        this.coreConfigFileInputStream = coreConfigFileInputStream;
+    public HelixBasedCoreModule() {
     }
 
     @Override
     protected void configure() {
 
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (coreConfigFileInputStream != null) {
-            try {
-                coreConfigFileInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-        
         bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
-        bind(S4RLoaderFactory.class);
-
-        // For enabling checkpointing, one needs to use a custom module, such as
-        // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
-        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
-
-        // shed load in local sender only by default
-        bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
-        bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
-
-        bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
-
         bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
         bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
 
-        
     }
 
     @Provides
@@ -116,21 +65,5 @@ public class HelixBasedCoreModule extends AbstractModule {
                 tmpS4Dir.getAbsolutePath());
         return tmpS4Dir;
     }
-    //TODO: move this to a base class
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(coreConfigFileInputStream);
-
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 5e5c40f..3944971 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -1,12 +1,7 @@
 package org.apache.s4.core;
 
-import java.io.InputStream;
 import java.net.Inet4Address;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -18,28 +13,17 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.comm.HelixBasedCommModule;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.util.ArchiveFetchException;
 import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.core.util.ParametersInjectionModule;
 import org.apache.s4.deploy.AppStateModelFactory;
-import org.apache.s4.deploy.DeploymentManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Strings;
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
-import com.google.inject.Module;
 import com.google.inject.name.Named;
-import com.google.inject.util.Modules;
-import com.google.inject.util.Modules.OverriddenModuleBuilder;
 
 /**
  * This is the bootstrap for S4 nodes.
@@ -130,63 +114,5 @@ public class S4HelixBootstrap implements Bootstrap {
             e.printStackTrace();
         }
     }
-    public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
-        try {
-            Injector injector;
-            InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
-            InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
-
-            logger.info("Initializing S4 app with : {}", appConfig.toString());
-
-            AbstractModule commModule = new HelixBasedCommModule(commConfigFileInputStream);
-            AbstractModule coreModule = new HelixBasedCoreModule(coreConfigFileInputStream);
-
-            List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
-            for (String moduleClass : appConfig.getCustomModulesNames()) {
-                extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
-            }
-            Module combinedModule = Modules.combine(commModule, coreModule);
-            if (extraModules.size() > 0) {
-                OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
-                combinedModule = overridenModuleBuilder.with(extraModules);
-            }
-
-            if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
-
-                logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
-                Map<String, String> namedParameters = new HashMap<String, String>();
-
-                namedParameters.putAll(appConfig.getNamedParameters());
-                combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
-            }
-
-            if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
-                // In that case we won't be using an S4R classloader, app classes are available from the current
-                // classloader
-                // The app module provides bindings specific to the app class loader, in this case the current thread's
-                // class loader.
-                AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
-                // NOTE: because the app module can be overriden
-                combinedModule = Modules.override(appModule).with(combinedModule);
-                injector = parentInjector.createChildInjector(combinedModule);
-                logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
-                App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
-                app.init();
-                app.start();
-            } else {
-                injector = parentInjector.createChildInjector(combinedModule);
-                if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
-                    logger.info("S4 node in standby until app class or app URI is specified");
-                }
-                Server server = injector.getInstance(Server.class);
-                server.setInjector(injector);
-                DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
-                deploymentManager.deploy(appConfig);
-                // server.start(injector);
-            }
-        } catch (Exception e) {
-            logger.error("Cannot start S4 node", e);
-            System.exit(1);
-        }
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 427d999..b70c27f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -99,7 +99,7 @@ public class AppStateModel extends StateModel {
             @Override
             public void run() {
                 // load app class through modules classloader and start it
-                S4HelixBootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+                S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
                 // signalOneAppLoaded.countDown();
             }
         }, "S4 platform loader");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 60d66bc..e345080 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -18,25 +18,17 @@
 
 package org.apache.s4.deploy;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.App;
 import org.apache.s4.core.Server;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
@@ -68,7 +60,6 @@ import com.google.inject.name.Named;
  */
 public class DistributedDeploymentManager implements DeploymentManager {
 
-
     private static Logger logger = LoggerFactory.getLogger(DistributedDeploymentManager.class);
 
     private final String clusterName;
@@ -102,14 +93,9 @@ public class DistributedDeploymentManager implements DeploymentManager {
 
     public void deployApplication() throws DeploymentFailedException {
         ZNRecord appData = zkClient.readData(appPath);
-		AppConfig appConfig = new AppConfig(appData);
-        deploy(appConfig);
-        DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
-        deployed = true;
+        AppConfig appConfig = new AppConfig(appData);
     }
 
-
-
     // NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
     // but that's probably not that useful, and we can simply provide whichever protocol is needed
 
@@ -142,15 +128,15 @@ public class DistributedDeploymentManager implements DeploymentManager {
         }
     }
 
-	@Override
-	public void deploy(AppConfig appConfig) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void undeploy(AppConfig appConfig) {
-		// TODO Auto-generated method stub
-		
-	}
+    @Override
+    public void deploy(AppConfig appConfig) throws DeploymentFailedException {
+        DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+        deployed = true;
+    }
+
+    @Override
+    public void undeploy(AppConfig appConfig) {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index a1ec575..40dfab4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -19,6 +19,7 @@
 package org.apache.s4.core.ft;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -97,6 +98,10 @@ public class FTWordCountTest extends ZkBasedTest {
                 .watchAndSignalCreation("/classifierIteration_"
                         + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
                         sentence2Processed, zk);
+        // when a node dies, the communication channel still exists
+        // and we need to send a message to detect it is broken and create a new one
+        emitter.send(new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
+                ByteBuffer.wrap(new byte[0]));
 
         injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
 
@@ -107,6 +112,10 @@ public class FTWordCountTest extends ZkBasedTest {
         // crash the app
         forkedS4App.destroy();
         restartNode();
+        // when a node dies, the communication channel still exists
+        // and we need to send a message to detect it is broken and create a new one
+        emitter.send(new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
+                ByteBuffer.wrap(new byte[0]));
 
         // add authorizations for continuing processing. Without these, the
         // WordClassifier processed keeps waiting

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 1236e14..c27bbb5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.s4.core.ft;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -168,10 +169,16 @@ public class RecoveryTest extends ZkBasedTest {
         CountDownLatch signalValue2Set = new CountDownLatch(1);
         CoreTestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
 
+        // when a node dies, the communication channel still exists
+        // and we need to send a message to detect it is broken and create a new one
+        emitter.send(new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
+                ByteBuffer.wrap(new byte[0]));
+
         event = new Event();
         event.setStreamId("inputStream");
         event.put("command", String.class, "setValue2");
         event.put("value", String.class, "message2");
+
         emitter.send(
                 new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
                 injector.getInstance(SerializerDeserializerFactory.class)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 965a621..29bdc0a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -31,10 +31,9 @@ import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.S4Node;
 import org.gradle.tooling.BuildLauncher;
 import org.gradle.tooling.GradleConnector;
+import org.gradle.tooling.ProgressListener;
 import org.gradle.tooling.ProjectConnection;
 
-import sun.net.ProgressListener;
-
 import com.google.common.io.PatternFilenameFilter;
 import com.google.common.io.Resources;
 import com.google.inject.Guice;