You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/08 16:45:37 UTC
git commit: fixes so that all zk tests pass and helix generic
injection runs correctly - when a cluster node dies and another node with
same destination appears,
the connection must be reinitiated. This is detected when sending a new
message. This messag
Updated Branches:
refs/heads/S4-110-new 75981b873 -> 0d2f441a0
fixes so that all zk tests pass and helix generic injection runs correctly
- when a cluster node dies and another node with same destination appears, the connection
must be reinitiated. This is detected when sending a new message. This message is lost
but the channel is closed and destination removed, so a new connection can be initiated
- in recovery tests, this implies sending another message.
- also factored helix custom modules and bootstrap
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0d2f441a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0d2f441a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0d2f441a
Branch: refs/heads/S4-110-new
Commit: 0d2f441a0963bacaf633eb3353678aac730ecd6a
Parents: 75981b8
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Feb 8 17:39:26 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Feb 8 17:39:26 2013 +0100
----------------------------------------------------------------------
.../src/main/resources/default.s4.base.properties | 5 +-
.../org/apache/s4/comm/HelixBasedCommModule.java | 86 ++-------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 16 ++-
.../main/java/org/apache/s4/core/BaseModule.java | 5 +-
.../org/apache/s4/core/HelixBasedCoreModule.java | 69 +------------
.../java/org/apache/s4/core/S4HelixBootstrap.java | 76 +-------------
.../java/org/apache/s4/deploy/AppStateModel.java | 2 +-
.../s4/deploy/DistributedDeploymentManager.java | 38 ++-----
.../org/apache/s4/core/ft/FTWordCountTest.java | 9 ++
.../java/org/apache/s4/core/ft/RecoveryTest.java | 7 +
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 3 +-
11 files changed, 58 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-base/src/main/resources/default.s4.base.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/resources/default.s4.base.properties b/subprojects/s4-base/src/main/resources/default.s4.base.properties
index 681232e..c5a56d2 100644
--- a/subprojects/s4-base/src/main/resources/default.s4.base.properties
+++ b/subprojects/s4-base/src/main/resources/default.s4.base.properties
@@ -2,4 +2,7 @@
s4.cluster.zk_address = localhost:2181
s4.cluster.zk_session_timeout = 10000
s4.cluster.zk_connection_timeout = 10000
-s4.logger_level = DEBUG
\ No newline at end of file
+s4.logger_level = DEBUG
+
+# use helix for cluster management, application lifecycle, and partitioning
+s4.helix = false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index 7084f1e..b2e92c7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -1,37 +1,18 @@
package org.apache.s4.comm;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
-import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
-import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.ClustersFromHelix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Scopes;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
public class HelixBasedCommModule extends AbstractModule {
- private static Logger logger = LoggerFactory.getLogger(HelixBasedCommModule.class);
- private InputStream commConfigInputStream;
- private PropertiesConfiguration config;
-
+ private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
/**
*
@@ -41,68 +22,19 @@ public class HelixBasedCommModule extends AbstractModule {
* the name of the cluster to which the current node belongs. If specified in the configuration file,
* this parameter will be ignored.
*/
- public HelixBasedCommModule(InputStream commConfigInputStream) {
- this.commConfigInputStream = commConfigInputStream;
+ public HelixBasedCommModule() {
}
- @SuppressWarnings("unchecked")
@Override
protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (commConfigInputStream != null) {
- try {
- commConfigInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
- /* Use Kryo to serialize events. */
- // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
- // classloader
- install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
- SerializerDeserializerFactory.class));
- // bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
- bind(Clusters.class).to(ClustersFromHelix.class).in(Scopes.SINGLETON);
- bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
+ // a node holds a single partition assignment
+ // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
+ bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
+ TaskStateModelFactory.class);
- bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
+ bind(Clusters.class).to(ClustersFromHelix.class);
- try {
- Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
- .getString("s4.comm.emitter.class"));
- bind(Emitter.class).to(emitterClass);
-
- // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
- Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
- .getString("s4.comm.emitter.remote.class"));
- install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
- RemoteEmitterFactory.class));
-
- } catch (ClassNotFoundException e) {
- logger.error("Cannot find class implementation ", e);
- }
}
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(commConfigInputStream);
-
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index cca28b2..89a2e42 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -75,7 +75,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
private final int nettyTimeout;
- private Cluster topology;
+ private final Cluster topology;
private final ClientBootstrap bootstrap;
/*
@@ -120,7 +120,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
this.lock = new ReentrantLock();
// Initialize data structures
- // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+ // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
destinationChannelMap = HashBiMap.create();
// Initialize netty related structures
@@ -128,6 +128,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
Executors.newCachedThreadPool());
bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
@Override
public ChannelPipeline getPipeline() {
ChannelPipeline p = Channels.pipeline();
@@ -188,14 +189,14 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
}
- writePermits.get(destination).acquire();
-
Channel c = destinationChannelMap.get(destination);
if (c == null) {
logger.warn("Could not find channel for destination {}", destination);
return false;
}
+ writePermits.get(destination).acquire();
+
c.write(buffer).addListener(new MessageSendingListener(destination));
return true;
}
@@ -303,12 +304,17 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
// TODO handle possible cluster reconfiguration between send and failure callback
logger.warn("Failed to send message to node {} (according to current cluster information)",
destination);
+ future.getChannel().close();
+ destinationChannelMap.inverse().remove(future.getChannel());
} catch (IndexOutOfBoundsException ignored) {
logger.error("Failed to send message to partition {}", destination);
// cluster was changed
}
} else {
- metrics.sentMessage(destination);
+ // TODO fix metrics initialization for helix
+ if (metrics != null) {
+ metrics.sentMessage(destination);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index ccb478f..7f0a539 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -47,9 +47,10 @@ public class BaseModule extends AbstractModule {
if (config == null) {
loadProperties(binder());
}
+ // share the Zookeeper connection
bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
- if (config.containsKey("s4.helix") && config.getBoolean("s4.helix")) {
+ if (config.getBoolean("s4.helix")) {
bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
bind(Cluster.class).to(ClusterFromHelix.class);
bind(TaskStateModelFactory.class);
@@ -58,8 +59,6 @@ public class BaseModule extends AbstractModule {
bind(Bootstrap.class).to(S4HelixBootstrap.class);
- // share the Zookeeper connection
- return;
} else {
// a node holds a single partition assignment
// ==> Assignment is a singleton so it shared between base, comm and app
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 12ba248..01f086c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -19,39 +19,19 @@
package org.apache.s4.core;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.topology.HelixRemoteStreams;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.ZkRemoteStreams;
-import org.apache.s4.core.ft.CheckpointingFramework;
-import org.apache.s4.core.ft.NoOpCheckpointingFramework;
-import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.SenderExecutorServiceFactory;
-import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
import org.apache.s4.deploy.HelixBasedDeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Files;
import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.name.Named;
-import com.google.inject.name.Names;
/**
* Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
@@ -61,49 +41,18 @@ import com.google.inject.name.Names;
public class HelixBasedCoreModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
- private InputStream coreConfigFileInputStream;
- private PropertiesConfiguration config;
- public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
- this.coreConfigFileInputStream = coreConfigFileInputStream;
+ public HelixBasedCoreModule() {
}
@Override
protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (coreConfigFileInputStream != null) {
- try {
- coreConfigFileInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
- bind(S4RLoaderFactory.class);
-
- // For enabling checkpointing, one needs to use a custom module, such as
- // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
- bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
-
- // shed load in local sender only by default
- bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
- bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
-
- bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
-
bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
-
}
@Provides
@@ -116,21 +65,5 @@ public class HelixBasedCoreModule extends AbstractModule {
tmpS4Dir.getAbsolutePath());
return tmpS4Dir;
}
- //TODO: move this to a base class
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(coreConfigFileInputStream);
-
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 5e5c40f..3944971 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -1,12 +1,7 @@
package org.apache.s4.core;
-import java.io.InputStream;
import java.net.Inet4Address;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -18,28 +13,17 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.comm.HelixBasedCommModule;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.core.util.ParametersInjectionModule;
import org.apache.s4.deploy.AppStateModelFactory;
-import org.apache.s4.deploy.DeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Strings;
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
-import com.google.inject.Module;
import com.google.inject.name.Named;
-import com.google.inject.util.Modules;
-import com.google.inject.util.Modules.OverriddenModuleBuilder;
/**
* This is the bootstrap for S4 nodes.
@@ -130,63 +114,5 @@ public class S4HelixBootstrap implements Bootstrap {
e.printStackTrace();
}
}
- public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
- try {
- Injector injector;
- InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
- InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
-
- logger.info("Initializing S4 app with : {}", appConfig.toString());
-
- AbstractModule commModule = new HelixBasedCommModule(commConfigFileInputStream);
- AbstractModule coreModule = new HelixBasedCoreModule(coreConfigFileInputStream);
-
- List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
- for (String moduleClass : appConfig.getCustomModulesNames()) {
- extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
- }
- Module combinedModule = Modules.combine(commModule, coreModule);
- if (extraModules.size() > 0) {
- OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
- combinedModule = overridenModuleBuilder.with(extraModules);
- }
-
- if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
-
- logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
- Map<String, String> namedParameters = new HashMap<String, String>();
-
- namedParameters.putAll(appConfig.getNamedParameters());
- combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
- }
-
- if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
- // In that case we won't be using an S4R classloader, app classes are available from the current
- // classloader
- // The app module provides bindings specific to the app class loader, in this case the current thread's
- // class loader.
- AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
- // NOTE: because the app module can be overriden
- combinedModule = Modules.override(appModule).with(combinedModule);
- injector = parentInjector.createChildInjector(combinedModule);
- logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
- App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
- app.init();
- app.start();
- } else {
- injector = parentInjector.createChildInjector(combinedModule);
- if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
- logger.info("S4 node in standby until app class or app URI is specified");
- }
- Server server = injector.getInstance(Server.class);
- server.setInjector(injector);
- DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
- deploymentManager.deploy(appConfig);
- // server.start(injector);
- }
- } catch (Exception e) {
- logger.error("Cannot start S4 node", e);
- System.exit(1);
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 427d999..b70c27f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -99,7 +99,7 @@ public class AppStateModel extends StateModel {
@Override
public void run() {
// load app class through modules classloader and start it
- S4HelixBootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+ S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
// signalOneAppLoaded.countDown();
}
}, "S4 platform loader");
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 60d66bc..e345080 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -18,25 +18,17 @@
package org.apache.s4.deploy;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.App;
import org.apache.s4.core.Server;
import org.apache.s4.core.util.AppConfig;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
import com.google.inject.Inject;
import com.google.inject.name.Named;
@@ -68,7 +60,6 @@ import com.google.inject.name.Named;
*/
public class DistributedDeploymentManager implements DeploymentManager {
-
private static Logger logger = LoggerFactory.getLogger(DistributedDeploymentManager.class);
private final String clusterName;
@@ -102,14 +93,9 @@ public class DistributedDeploymentManager implements DeploymentManager {
public void deployApplication() throws DeploymentFailedException {
ZNRecord appData = zkClient.readData(appPath);
- AppConfig appConfig = new AppConfig(appData);
- deploy(appConfig);
- DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
- deployed = true;
+ AppConfig appConfig = new AppConfig(appData);
}
-
-
// NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
// but that's probably not that useful, and we can simply provide whichever protocol is needed
@@ -142,15 +128,15 @@ public class DistributedDeploymentManager implements DeploymentManager {
}
}
- @Override
- public void deploy(AppConfig appConfig) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void undeploy(AppConfig appConfig) {
- // TODO Auto-generated method stub
-
- }
+ @Override
+ public void deploy(AppConfig appConfig) throws DeploymentFailedException {
+ DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+ deployed = true;
+ }
+
+ @Override
+ public void undeploy(AppConfig appConfig) {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index a1ec575..40dfab4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -19,6 +19,7 @@
package org.apache.s4.core.ft;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -97,6 +98,10 @@ public class FTWordCountTest extends ZkBasedTest {
.watchAndSignalCreation("/classifierIteration_"
+ (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
sentence2Processed, zk);
+ // when a node dies, the communication channel still exists
+ // and we need to send a message to detect it is broken and create a new one
+ emitter.send(new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
+ ByteBuffer.wrap(new byte[0]));
injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
@@ -107,6 +112,10 @@ public class FTWordCountTest extends ZkBasedTest {
// crash the app
forkedS4App.destroy();
restartNode();
+ // when a node dies, the communication channel still exists
+ // and we need to send a message to detect it is broken and create a new one
+ emitter.send(new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
+ ByteBuffer.wrap(new byte[0]));
// add authorizations for continuing processing. Without these, the
// WordClassifier processed keeps waiting
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 1236e14..c27bbb5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -19,6 +19,7 @@
package org.apache.s4.core.ft;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -168,10 +169,16 @@ public class RecoveryTest extends ZkBasedTest {
CountDownLatch signalValue2Set = new CountDownLatch(1);
CoreTestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+ // when a node dies, the communication channel still exists
+ // and we need to send a message to detect it is broken and create a new one
+ emitter.send(new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
+ ByteBuffer.wrap(new byte[0]));
+
event = new Event();
event.setStreamId("inputStream");
event.put("command", String.class, "setValue2");
event.put("value", String.class, "message2");
+
emitter.send(
new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
injector.getInstance(SerializerDeserializerFactory.class)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d2f441a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 965a621..29bdc0a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -31,10 +31,9 @@ import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.core.S4Node;
import org.gradle.tooling.BuildLauncher;
import org.gradle.tooling.GradleConnector;
+import org.gradle.tooling.ProgressListener;
import org.gradle.tooling.ProjectConnection;
-import sun.net.ProgressListener;
-
import com.google.common.io.PatternFilenameFilter;
import com.google.common.io.Resources;
import com.google.inject.Guice;