You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/05 12:26:52 UTC
git commit: Fix dependency injection issues
Updated Branches:
refs/heads/S4-110-new 55f8c2157 -> 24a11e18b
Fix dependency injection issues
- allow for a clear separation of the bootstrap layer (i.e. process + connectivity to
cluster manager) from platform configuration (comm and core bindings) and application code
- unfortunately there is no communication between nodes now.
Main issue relates to partitioning based on streams: the Emitter implementations need
to have the information about where to send messages. It used to be a simple mapping
partitionID -> node, but by adding streams it get more complex. One option would be to
have 1 emitter per stream. Another is to resolve mappings in the sender.
These solutions imply many changes, so I wonder whether we could first have a version
without variable partitions per streams?
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/24a11e18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/24a11e18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/24a11e18
Branch: refs/heads/S4-110-new
Commit: 24a11e18b35436c5d60dfe07dc6fe81565fd91f4
Parents: 55f8c21
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Feb 5 13:03:10 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Feb 5 13:18:58 2013 +0100
----------------------------------------------------------------------
.../src/main/resources/default.s4.base.properties | 3 +-
.../java/org/apache/s4/comm/DefaultCommModule.java | 6 +-
.../org/apache/s4/comm/HelixBasedCommModule.java | 109 +----------
.../apache/s4/comm/topology/ClusterFromHelix.java | 13 +-
.../s4/comm/topology/HelixRemoteStreams.java | 24 +++
.../main/java/org/apache/s4/core/BaseModule.java | 31 ++--
.../org/apache/s4/core/HelixBasedCoreModule.java | 71 +-------
.../main/java/org/apache/s4/core/S4Bootstrap.java | 14 +-
.../java/org/apache/s4/core/S4HelixBootstrap.java | 157 +++++++--------
.../src/main/java/org/apache/s4/core/S4Node.java | 9 +-
.../src/main/java/org/apache/s4/core/Server.java | 8 +-
.../java/org/apache/s4/core/util/AppConfig.java | 24 +++
.../java/org/apache/s4/deploy/AppStateModel.java | 139 ++++++++-----
.../org/apache/s4/deploy/AppStateModelFactory.java | 11 +-
.../s4/deploy/HelixBasedDeploymentManager.java | 21 +-
.../src/main/resources/default.s4.core.properties | 2 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 2 +-
.../java/org/apache/s4/tools/helix/DeployApp.java | 56 +++++-
18 files changed, 318 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-base/src/main/resources/default.s4.base.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/resources/default.s4.base.properties b/subprojects/s4-base/src/main/resources/default.s4.base.properties
index 8acb17c..681232e 100644
--- a/subprojects/s4-base/src/main/resources/default.s4.base.properties
+++ b/subprojects/s4-base/src/main/resources/default.s4.base.properties
@@ -1,4 +1,5 @@
# default properties for bootstraping a node
s4.cluster.zk_address = localhost:2181
s4.cluster.zk_session_timeout = 10000
-s4.cluster.zk_connection_timeout = 10000
\ No newline at end of file
+s4.cluster.zk_connection_timeout = 10000
+s4.logger_level = DEBUG
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index c557413..a1d6319 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -33,8 +33,6 @@ import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromZK;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.ClustersFromZK;
import org.slf4j.Logger;
@@ -90,10 +88,8 @@ public class DefaultCommModule extends AbstractModule {
install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
SerializerDeserializerFactory.class));
- bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
-
+ // bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
-
bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index 980f9bd..b2e92c7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -1,48 +1,18 @@
package org.apache.s4.comm;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.helix.TaskStateModelFactory;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
-import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromHelix;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterFromHelix;
-import org.apache.s4.comm.topology.ClusterFromZK;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.ClustersFromHelix;
-import org.apache.s4.comm.topology.ClustersFromZK;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Scopes;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
public class HelixBasedCommModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
- InputStream commConfigInputStream;
- private PropertiesConfiguration config;
- String clusterName;
/**
*
@@ -52,96 +22,19 @@ public class HelixBasedCommModule extends AbstractModule {
* the name of the cluster to which the current node belongs. If specified in the configuration file,
* this parameter will be ignored.
*/
- public HelixBasedCommModule(InputStream commConfigInputStream, String clusterName) {
- super();
- this.commConfigInputStream = commConfigInputStream;
- this.clusterName = clusterName;
+ public HelixBasedCommModule() {
}
- @SuppressWarnings("unchecked")
@Override
protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (commConfigInputStream != null) {
- try {
- commConfigInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
- /* Use Kryo to serialize events. */
- install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
- SerializerDeserializerFactory.class));
// a node holds a single partition assignment
// ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
TaskStateModelFactory.class);
-
- bind(Assignment.class).to(AssignmentFromHelix.class);
-
- bind(Cluster.class).to(ClusterFromHelix.class);
bind(Clusters.class).to(ClustersFromHelix.class);
- bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
-
- bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
-
- try {
- Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
- .getString("s4.comm.emitter.class"));
- bind(Emitter.class).to(emitterClass);
-
- // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
- Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
- .getString("s4.comm.emitter.remote.class"));
- install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
- RemoteEmitterFactory.class));
- bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
-
-// bind(Listener.class).to(
-// (Class<? extends Listener>) Class.forName(config.getString("s4.comm.listener.class")));
-
- } catch (ClassNotFoundException e) {
- logger.error("Cannot find class implementation ", e);
- }
-
- }
-
- @SuppressWarnings("serial")
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(commConfigInputStream);
-
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
- if (clusterName != null) {
- if (config.containsKey("s4.cluster.name")) {
- logger.warn(
- "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
- clusterName, config.getProperty("s4.cluster.name"));
- } else {
- Names.bindProperties(binder, new HashMap<String, String>() {
- {
- put("s4.cluster.name", clusterName);
- }
- });
- }
- }
-
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 19e0628..7c49ac0 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -27,23 +27,21 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
/**
* Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
@@ -114,6 +112,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
}
}
partitionCountMapRef.set(map);
+
for (ClusterChangeListener listener : listeners) {
listener.onChange();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java
new file mode 100644
index 0000000..b6b4558
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/HelixRemoteStreams.java
@@ -0,0 +1,24 @@
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class HelixRemoteStreams implements RemoteStreams {
+
+ @Override
+ public Set<StreamConsumer> getConsumers(String streamName) {
+ // TODO implement?
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void addOutputStream(String appId, String clusterName, String streamName) {
+ // TODO implement?
+ }
+
+ @Override
+ public void addInputStream(int appId, String clusterName, String streamName) {
+ // TODO implement?
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index b9679e7..171c564 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -6,16 +6,15 @@ import java.util.HashMap;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromHelix;
-import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterFromHelix;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.HelixBasedDeploymentManager;
+import org.apache.s4.deploy.AppStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,11 +30,10 @@ public class BaseModule extends AbstractModule {
private PropertiesConfiguration config;
InputStream baseConfigInputStream;
String clusterName;
- private String instanceName;
+ private final String instanceName;
boolean useHelix = true;
- public BaseModule(InputStream baseConfigInputStream, String clusterName,
- String instanceName) {
+ public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName) {
super();
this.baseConfigInputStream = baseConfigInputStream;
this.clusterName = clusterName;
@@ -48,32 +46,31 @@ public class BaseModule extends AbstractModule {
loadProperties(binder());
}
if (useHelix) {
- bind(Assignment.class).to(AssignmentFromHelix.class)
- .asEagerSingleton();
+ bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+ bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
bind(Cluster.class).to(ClusterFromHelix.class);
- bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+ bind(TaskStateModelFactory.class);
+ bind(AppStateModelFactory.class).in(Scopes.SINGLETON);
+ // bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(Bootstrap.class).to(S4HelixBootstrap.class);
// share the Zookeeper connection
- bind(ZkClient.class).toProvider(ZkClientProvider.class).in(
- Scopes.SINGLETON);
return;
}
// a node holds a single partition assignment
// ==> Assignment is a singleton so it shared between base, comm and app
// layers.
// it is eager so that the node is able to join a cluster immediately
- bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+ // bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
// bind(Cluster.class).to(ClusterFromZK.class);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(Bootstrap.class).to(S4Bootstrap.class);
// share the Zookeeper connection
- bind(ZkClient.class).toProvider(ZkClientProvider.class).in(
- Scopes.SINGLETON);
+ bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
}
@@ -86,8 +83,7 @@ public class BaseModule extends AbstractModule {
// TODO - validate properties.
/* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
if (clusterName != null) {
if (config.containsKey("s4.cluster.name")) {
@@ -106,8 +102,7 @@ public class BaseModule extends AbstractModule {
if (config.containsKey("s4.instance.name")) {
logger.warn(
"instanceName [{}] passed as a parameter will not be used because an existing s4.instance.name parameter of value [{}] was found in the configuration file and will be used",
- instanceName,
- config.getProperty("s4.instance.name"));
+ instanceName, config.getProperty("s4.instance.name"));
} else {
Names.bindProperties(binder, new HashMap<String, String>() {
{
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 0ce80ed..01f086c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -19,38 +19,19 @@
package org.apache.s4.core;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.topology.HelixRemoteStreams;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.ZkRemoteStreams;
-import org.apache.s4.core.ft.CheckpointingFramework;
-import org.apache.s4.core.ft.NoOpCheckpointingFramework;
-import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.apache.s4.core.staging.SenderExecutorServiceFactory;
-import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
import org.apache.s4.deploy.HelixBasedDeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Files;
import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.name.Named;
-import com.google.inject.name.Names;
/**
* Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
@@ -61,45 +42,15 @@ public class HelixBasedCoreModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
- InputStream coreConfigFileInputStream;
- private PropertiesConfiguration config;
-
- String clusterName = null;
-
- public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
- this.coreConfigFileInputStream = coreConfigFileInputStream;
+ public HelixBasedCoreModule() {
}
@Override
protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (coreConfigFileInputStream != null) {
- try {
- coreConfigFileInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
- bind(S4RLoaderFactory.class);
-
- // For enabling checkpointing, one needs to use a custom module, such as
- // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
- bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
-
- // shed load in local sender only by default
- bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
- bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
-
- bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
-
- bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
+ bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
}
@@ -115,20 +66,4 @@ public class HelixBasedCoreModule extends AbstractModule {
return tmpS4Dir;
}
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(coreConfigFileInputStream);
-
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 61f6b7f..b3a5f3d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -24,6 +24,7 @@ import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.core.util.AppConfig;
import org.apache.s4.core.util.ParametersInjectionModule;
+import org.apache.s4.deploy.DeploymentManager;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import com.google.inject.util.Modules.OverriddenModuleBuilder;
*
*
*/
-public class S4Bootstrap implements Bootstrap{
+public class S4Bootstrap implements Bootstrap {
private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
private final ZkClient zkClient;
@@ -90,6 +91,7 @@ public class S4Bootstrap implements Bootstrap{
zkClient.subscribeDataChanges(appPath, new AppChangeListener());
}
+ @Override
public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException {
this.parentInjector = parentInjector;
if (zkClient.exists(appPath)) {
@@ -112,7 +114,7 @@ public class S4Bootstrap implements Bootstrap{
List<File> modulesLocalCopies = new ArrayList<File>();
for (String uriString : appConfig.getCustomModulesURIs()) {
- modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString));
+ modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString, fetcher));
}
final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
@@ -129,7 +131,8 @@ public class S4Bootstrap implements Bootstrap{
}
- private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
+ public static File fetchModuleAndCopyToLocalFile(String appName, String uriString, ArchiveFetcher fetcher)
+ throws ArchiveFetchException {
URI uri;
try {
@@ -208,7 +211,10 @@ public class S4Bootstrap implements Bootstrap{
logger.info("S4 node in standby until app class or app URI is specified");
}
Server server = injector.getInstance(Server.class);
- server.start(injector);
+ server.setInjector(injector);
+ DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
+ deploymentManager.deploy(appConfig);
+ // server.start(injector);
}
} catch (Exception e) {
logger.error("Cannot start S4 node", e);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index d20c962..3944971 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -1,51 +1,29 @@
package org.apache.s4.core;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.s4.base.util.ModulesLoader;
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.comm.ModulesLoaderFactory;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.core.util.ParametersInjectionModule;
import org.apache.s4.deploy.AppStateModelFactory;
-import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
-import com.google.inject.Module;
import com.google.inject.name.Named;
-import com.google.inject.util.Modules;
-import com.google.inject.util.Modules.OverriddenModuleBuilder;
/**
* This is the bootstrap for S4 nodes.
@@ -56,84 +34,85 @@ import com.google.inject.util.Modules.OverriddenModuleBuilder;
* <li>look for application deployed on the S4 cluster
* </ul>
* <p>
- * When an application is available, custom modules are fetched if necessary and
- * a full-featured S4 node is started. The application code is then downloaded
- * and the app started.
+ * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
+ * application code is then downloaded and the app started.
* <p>
- * For testing purposes, it is also possible to directly start an application
- * without fetching remote code, provided the application classes are available
- * in the classpath.
+ * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
+ * application classes are available in the classpath.
*
*
*
*/
-public class S4HelixBootstrap implements Bootstrap{
- private static Logger logger = LoggerFactory
- .getLogger(S4HelixBootstrap.class);
+public class S4HelixBootstrap implements Bootstrap {
+ private static Logger logger = LoggerFactory.getLogger(S4HelixBootstrap.class);
- private final AtomicBoolean deployed = new AtomicBoolean(false);
+ private final AtomicBoolean deployed = new AtomicBoolean(false);
- private final ArchiveFetcher fetcher;
+ private final ArchiveFetcher fetcher;
- private Injector parentInjector;
+ // private Injector parentInjector;
- CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+ CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
- private String clusterName;
+ private final String clusterName;
- private String instanceName;
+ private final String instanceName;
+
+ private final String zookeeperAddress;
+ private final TaskStateModelFactory taskStateModelFactory;
+
+ private final AppStateModelFactory appStateModelFactory;
+
+ private final Cluster cluster;
+
+ private final Lock startingNode = new ReentrantLock();
+
+ public static Injector rootInjector;
- private String zookeeperAddress;
- @Inject
- private TaskStateModelFactory taskStateModelFactory;
-
- @Inject
- private AppStateModelFactory appStateModelFactory;
-
@Inject
- private Cluster cluster;
-
- @Inject
- public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
- @Named("s4.instance.name") String instanceName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
- ArchiveFetcher fetcher) {
+ public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
+ AppStateModelFactory appStateModelFactory, TaskStateModelFactory taskStateModelFactory,
+ ArchiveFetcher fetcher, Cluster cluster) {
this.clusterName = clusterName;
this.instanceName = instanceName;
this.zookeeperAddress = zookeeperAddress;
- this.fetcher = fetcher;
+ this.taskStateModelFactory = taskStateModelFactory;
+ this.appStateModelFactory = appStateModelFactory;
+ this.fetcher = fetcher;
+ this.cluster = cluster;
+ }
+
+ @Override
+ public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException, UnknownHostException {
+
+ // start a HelixController to manage the cluster
+ // TODO set this as optional (small clusters only)
+ String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
+ HelixControllerMain.startHelixController(zookeeperAddress, clusterName, controllerName,
+ HelixControllerMain.STANDALONE);
+ // this.parentInjector = parentInjector;
+ S4HelixBootstrap.rootInjector = parentInjector;
registerWithHelix();
- }
-
- public void start(Injector parentInjector) throws InterruptedException,
- ArchiveFetchException {
- this.parentInjector = parentInjector;
- if (!deployed.get()) {
-
- }
- signalOneAppLoaded.await();
- }
-
- private void registerWithHelix()
- {
- HelixManager helixManager;
- try
- {
- helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
- instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
- helixManager.getStateMachineEngine().registerStateModelFactory(
- "LeaderStandby", taskStateModelFactory);
- helixManager.getStateMachineEngine().registerStateModelFactory(
- "OnlineOffline", appStateModelFactory);
- helixManager.connect();
- helixManager.addExternalViewChangeListener((RoutingTableProvider)cluster);
- } catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+
+ signalOneAppLoaded.await();
+ }
+
+ private void registerWithHelix() {
+ HelixManager helixManager;
+ try {
+ helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+ zookeeperAddress);
+ helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", taskStateModelFactory);
+ helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", appStateModelFactory);
+ helixManager.connect();
+ helixManager.addExternalViewChangeListener((RoutingTableProvider) cluster);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
index db84c35..ab2be80 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -3,7 +3,6 @@ package org.apache.s4.core;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
-import org.apache.s4.comm.util.ArchiveFetchException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +50,11 @@ public class S4Node {
Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
"default.s4.base.properties").openStream(), nodeArgs.clusterName, nodeArgs.instanceName) });
- S4Bootstrap bootstrap = injector.getInstance(S4Bootstrap.class);
+ Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
try {
bootstrap.start(injector);
- } catch (ArchiveFetchException e1) {
- logger.error("Cannot fetch module dependencies.", e1);
+ } catch (Exception e1) {
+ logger.error("Cannot start node ", e1);
}
}
@@ -74,7 +73,7 @@ public class S4Node {
@Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
String zkConnectionString;
-
+
@Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
String instanceName = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index d344698..c5917b7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -27,7 +27,7 @@ import java.util.jar.JarFile;
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.base.util.S4RLoader;
import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.DeploymentManager;
import org.slf4j.Logger;
@@ -57,7 +57,7 @@ public class Server {
private DeploymentManager deploymentManager;
@Inject
- private AssignmentFromZK assignment;
+ private Assignment assignment;
private final ZkClient zkClient;
@@ -75,6 +75,10 @@ public class Server {
zkClient.setZkSerializer(new ZNRecordSerializer());
}
+ public void setInjector(Injector injector) {
+ this.injector = injector;
+ }
+
public void start(Injector injector) throws Exception {
this.injector = injector;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
index a31ff17..5b9b0e7 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
@@ -7,6 +7,8 @@ import java.util.Map;
import org.apache.s4.comm.topology.ZNRecord;
+import com.beust.jcommander.internal.Maps;
+
public class AppConfig {
public static final String NAMED_PARAMETERS = "namedParams";
@@ -70,6 +72,10 @@ public class AppConfig {
return namedParameters;
}
+ public boolean isValid() {
+ return (appClassName != null || appURI != null) && appName != null;
+ }
+
public String getNamedParametersAsString() {
if (namedParameters == null || namedParameters.isEmpty()) {
return "";
@@ -104,6 +110,24 @@ public class AppConfig {
return record;
}
+ public Map<String, String> asMap() {
+ Map<String, String> result = Maps.newHashMap();
+ result.put(APP_NAME, appName);
+ result.put(APP_URI, appURI);
+ StringBuilder sb = new StringBuilder();
+ for (String customModuleName : customModulesNames) {
+ sb.append(customModuleName + ",");
+ }
+ result.put(MODULES_CLASSES, sb.toString());
+ sb = new StringBuilder();
+ for (String customModulesURI : customModulesURIs) {
+ sb.append(customModulesURI + ",");
+ }
+ result.put(MODULES_URIS, sb.toString());
+ result.put(NAMED_PARAMETERS, getNamedParametersAsString());
+ return result;
+ }
+
@Override
public String toString() {
return "app name: [" + appName + "] \n " + "app class: [" + appClassName + "] \n" + "app URI : [" + appURI
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 5f21f51..b70c27f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -1,9 +1,10 @@
package org.apache.s4.deploy;
+import java.io.File;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigScope;
@@ -14,60 +15,96 @@ import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
+import org.apache.s4.base.util.ModulesLoader;
+import org.apache.s4.comm.ModulesLoaderFactory;
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.S4Bootstrap;
+import org.apache.s4.core.S4HelixBootstrap;
import org.apache.s4.core.util.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.inject.Injector;
+
@StateModelInfo(states = { "ONLINE,OFFLINE" }, initialState = "OFFLINE")
public class AppStateModel extends StateModel {
- private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
- private final String appName;
- private DeploymentManager deploymentManager;
-
- public AppStateModel(DeploymentManager deploymentManager, String appName) {
- this.deploymentManager = deploymentManager;
- this.appName = appName;
- }
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void deploy(Message message, NotificationContext context)
- throws Exception {
- logger.info("Deploying app:"+ appName);
- HelixManager manager = context.getManager();
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- AppConfig config = createAppConfig(manager, configAccessor);
- deploymentManager.deploy(config);
- logger.info("Deployed app:"+ appName);
-
- }
-
- private AppConfig createAppConfig(HelixManager manager,
- ConfigAccessor configAccessor) {
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(manager.getClusterName())
- .forResource(appName).build();
- String appURI = configAccessor.get(scope,
- DeploymentManager.S4R_URI);
- String clusterName = manager.getClusterName();
- String appClassName = null;
- List<String> customModulesNames = new ArrayList<String>();
- List<String> customModulesURIs = new ArrayList<String>();
- Map<String, String> namedParameters = new HashMap<String, String>();
- AppConfig config = new AppConfig(clusterName, appClassName, appURI,
- customModulesNames, customModulesURIs, namedParameters);
- return config;
- }
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void undeploy(Message message, NotificationContext context)
- throws Exception {
- logger.info("Undeploying app:"+ appName);
- HelixManager manager = context.getManager();
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- AppConfig config = createAppConfig(manager, configAccessor);
- deploymentManager.undeploy(config);
- logger.info("Undeploying app:"+ appName);
-
- }
+ private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
+ private final String appName;
+ private final ArchiveFetcher fetcher;
+
+ public AppStateModel(String appName, ArchiveFetcher fetcher) {
+ this.appName = appName;
+ this.fetcher = fetcher;
+ }
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void deploy(Message message, NotificationContext context) throws Exception {
+ logger.info("Deploying app:" + appName);
+ HelixManager manager = context.getManager();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ AppConfig appConfig = createAppConfig(manager, configAccessor);
+ loadModulesAndStartNode(S4HelixBootstrap.rootInjector, appConfig);
+ logger.info("Deployed app:" + appName);
+
+ }
+
+ private AppConfig createAppConfig(HelixManager manager, ConfigAccessor configAccessor) {
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
+ AppConfig appConfig = new AppConfig.Builder()
+ .appClassName(configAccessor.get(scope, AppConfig.APP_CLASS))
+ .appName(configAccessor.get(scope, AppConfig.APP_NAME))
+ .customModulesNames(
+ getListFromCommaSeparatedValues(configAccessor.get(scope, AppConfig.MODULES_CLASSES)))
+ .customModulesURIs(getListFromCommaSeparatedValues(configAccessor.get(scope, AppConfig.MODULES_URIS)))
+ .appURI(configAccessor.get(scope, AppConfig.APP_URI)).build();
+
+ return appConfig;
+ }
+
+ private static List<String> getListFromCommaSeparatedValues(String values) {
+ if (com.google.common.base.Strings.isNullOrEmpty(values)) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(values.split("[,]"));
+
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void undeploy(Message message, NotificationContext context) throws Exception {
+ logger.info("Undeploying app:" + appName);
+ HelixManager manager = context.getManager();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ AppConfig config = createAppConfig(manager, configAccessor);
+ // deploymentManager.undeploy(config);
+ logger.info("Undeploying app:" + appName);
+
+ }
+
+ private void loadModulesAndStartNode(final Injector parentInjector, final AppConfig appConfig)
+ throws ArchiveFetchException {
+
+ String appName = appConfig.getAppName();
+
+ List<File> modulesLocalCopies = new ArrayList<File>();
+
+ for (String uriString : appConfig.getCustomModulesURIs()) {
+ modulesLocalCopies.add(S4Bootstrap.fetchModuleAndCopyToLocalFile(appName, uriString, fetcher));
+ }
+ final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ // load app class through modules classloader and start it
+ S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+ // signalOneAppLoaded.countDown();
+ }
+ }, "S4 platform loader");
+ t.start();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
index 4f6dc00..2aa376a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
@@ -2,24 +2,23 @@ package org.apache.s4.deploy;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.s4.comm.util.ArchiveFetcher;
-import org.apache.s4.core.Server;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
public class AppStateModelFactory extends StateModelFactory<AppStateModel> {
- private final DeploymentManager deploymentManager;
- @Inject
- public AppStateModelFactory(DeploymentManager deploymentManager, ArchiveFetcher fetcher) {
- this.deploymentManager = deploymentManager;
+ private final ArchiveFetcher fetcher;
+ @Inject
+ public AppStateModelFactory(ArchiveFetcher fetcher) {
+ this.fetcher = fetcher;
}
@Override
public AppStateModel createNewStateModel(String partitionName) {
- return new AppStateModel(deploymentManager,partitionName);
+ return new AppStateModel(partitionName, fetcher);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index caec699..c5ba6ed 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -11,7 +11,7 @@ public class HelixBasedDeploymentManager implements DeploymentManager {
private final Server server;
boolean deployed = false;
private final String clusterName;
- private ArchiveFetcher fetcher;
+ private final ArchiveFetcher fetcher;
@Inject
public HelixBasedDeploymentManager(@Named("s4.cluster.name") String clusterName,
@@ -20,23 +20,22 @@ public class HelixBasedDeploymentManager implements DeploymentManager {
@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, ArchiveFetcher fetcher) {
this.clusterName = clusterName;
this.server = server;
- this.fetcher = fetcher;
+ this.fetcher = fetcher;
}
@Override
public void start() {
-
}
- @Override
- public void deploy(AppConfig appConfig) throws DeploymentFailedException {
- DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
- }
+ @Override
+ public void deploy(AppConfig appConfig) throws DeploymentFailedException {
+ DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+ }
- @Override
- public void undeploy(AppConfig appConfig) {
-
- }
+ @Override
+ public void undeploy(AppConfig appConfig) {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-core/src/main/resources/default.s4.core.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/default.s4.core.properties b/subprojects/s4-core/src/main/resources/default.s4.core.properties
index cb5e20a..8b13789 100644
--- a/subprojects/s4-core/src/main/resources/default.s4.core.properties
+++ b/subprojects/s4-core/src/main/resources/default.s4.core.properties
@@ -1 +1 @@
-s4.logger_level = DEBUG
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 76a1d43..01eec0f 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -135,7 +135,7 @@ public class Deploy extends S4ArgsBase {
}
- private static Map<String, String> convertListArgsToMap(List<String> args) {
+ public static Map<String, String> convertListArgsToMap(List<String> args) {
Map<String, String> result = Maps.newHashMap();
for (String arg : args) {
String[] split = arg.split("[=]");
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/24a11e18/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index 813f13d..cc3d128 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -1,13 +1,13 @@
package org.apache.s4.tools.helix;
import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
@@ -15,15 +15,25 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.tools.IdealStateCalculatorByShuffling;
-import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.comm.HelixBasedCommModule;
+import org.apache.s4.core.HelixBasedCoreModule;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.tools.Deploy;
+import org.apache.s4.tools.Deploy.InlineConfigParameterConverter;
import org.apache.s4.tools.S4ArgsBase;
import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
public class DeployApp extends S4ArgsBase {
+
+ private static Logger logger = LoggerFactory.getLogger(DeployApp.class);
+
public static void main(String[] args) {
DeployAppArgs deployArgs = new DeployAppArgs();
@@ -31,10 +41,40 @@ public class DeployApp extends S4ArgsBase {
HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ // ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
Map<String, String> properties = new HashMap<String, String>();
- properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
+
+ URI s4rURI = null;
+ if (deployArgs.s4rPath != null) {
+ try {
+ s4rURI = new URI(deployArgs.s4rPath);
+ } catch (URISyntaxException e) {
+ logger.error("Cannot get URI from s4r parameter: {}", deployArgs.s4rPath);
+ return;
+ }
+ if (Strings.isNullOrEmpty(s4rURI.getScheme())) {
+ // default is file
+ s4rURI = new File(deployArgs.s4rPath).toURI();
+ }
+ logger.info(
+ "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+ s4rURI.toString());
+ } else {
+ throw new RuntimeException("Not specifying s4r URI (as -s4r parameter) is not supported yet");
+ }
+
+ ImmutableList<String> helixModules = ImmutableList.of(HelixBasedCommModule.class.getName(),
+ HelixBasedCoreModule.class.getName());
+ // TODO merge with custom modules
+
+ AppConfig appConfig = new AppConfig.Builder().appClassName(deployArgs.appClass).appName(deployArgs.appName)
+ .appURI(deployArgs.s4rPath).customModulesNames(helixModules).customModulesURIs(null)
+ .namedParameters(Deploy.convertListArgsToMap(deployArgs.extraNamedParameters)).build();
+ // properties.put("appConfig", appConfig.asMap());
+ // properties.put(DistributedDeploymentManager.S4R_URI, new File(deployArgs.s4rPath).toURI().toString());
properties.put("type", "App");
+ properties.putAll(appConfig.asMap());
admin.setConfig(scope, properties);
IdealState is = admin.getResourceIdealState(deployArgs.clusterName, deployArgs.appName);
@@ -78,8 +118,14 @@ public class DeployApp extends S4ArgsBase {
@Parameter(names = { "-appName" }, description = "Name of the App", required = true, arity = 1)
String appName;
+ @Parameter(names = { "-a", "-appClass" }, description = "Full class name of the application class (extending App or AdapterApp)", required = false)
+ String appClass = "";
+
@Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the App needs to be deployed", required = false, arity = 1)
String nodeGroup = "default";
+ @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
+ List<String> extraNamedParameters = new ArrayList<String>();
+
}
}