You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/08 05:28:05 UTC
git commit: [S4-110]Working version with Helix using
genericEventAdapter.Needs some more clean up and refactoring
Updated Branches:
refs/heads/S4-110-new 423b7e873 -> 2b718c98b
[S4-110]Working version with Helix using genericEventAdapter.Needs some more clean up and refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2b718c98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2b718c98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2b718c98
Branch: refs/heads/S4-110-new
Commit: 2b718c98bee855ab51b8558d9f88d6e9eb4aaafa
Parents: 423b7e8
Author: Kishore G <ki...@apache.org>
Authored: Thu Feb 7 21:26:30 2013 -0800
Committer: Kishore Gopalakrishna <kg...@kgopalak-ld.linkedin.biz>
Committed: Thu Feb 7 21:27:43 2013 -0800
----------------------------------------------------------------------
.../org/apache/s4/comm/HelixBasedCommModule.java | 86 +++++++++-
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 11 +-
.../java/org/apache/s4/comm/tcp/TCPListener.java | 5 +-
.../apache/s4/comm/topology/ClusterFromHelix.java | 128 ++++++++++++---
.../org/apache/s4/comm/util/EmitterMetrics.java | 2 +-
.../java/org/apache/s4/core/DefaultCoreModule.java | 5 +-
.../org/apache/s4/core/HelixBasedCoreModule.java | 69 ++++++++-
.../java/org/apache/s4/core/S4HelixBootstrap.java | 76 +++++++++-
.../java/org/apache/s4/deploy/AppStateModel.java | 2 +-
.../apache/s4/tools/helix/GenericEventAdapter.java | 1 +
10 files changed, 340 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index b2e92c7..7084f1e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -1,18 +1,37 @@
package org.apache.s4.comm;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.s4.comm.helix.TaskStateModelFactory;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
+import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
+import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.ClustersFromHelix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
public class HelixBasedCommModule extends AbstractModule {
- private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
+ private static Logger logger = LoggerFactory.getLogger(HelixBasedCommModule.class);
+ private InputStream commConfigInputStream;
+ private PropertiesConfiguration config;
+
/**
*
@@ -22,19 +41,68 @@ public class HelixBasedCommModule extends AbstractModule {
* the name of the cluster to which the current node belongs. If specified in the configuration file,
* this parameter will be ignored.
*/
- public HelixBasedCommModule() {
+ public HelixBasedCommModule(InputStream commConfigInputStream) {
+ this.commConfigInputStream = commConfigInputStream;
}
+ @SuppressWarnings("unchecked")
@Override
protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ if (commConfigInputStream != null) {
+ try {
+ commConfigInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+ /* Use Kryo to serialize events. */
+ // we use a factory for generating the serdeser instance in order to use runtime parameters such as the
+ // classloader
+ install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
+ SerializerDeserializerFactory.class));
- // a node holds a single partition assignment
- // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
- bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
- TaskStateModelFactory.class);
+ // bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
+ bind(Clusters.class).to(ClustersFromHelix.class).in(Scopes.SINGLETON);
+ bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
- bind(Clusters.class).to(ClustersFromHelix.class);
+ bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
+ try {
+ Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
+ .getString("s4.comm.emitter.class"));
+ bind(Emitter.class).to(emitterClass);
+
+ // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+ Class<? extends RemoteEmitter> remoteEmitterClass = (Class<? extends RemoteEmitter>) Class.forName(config
+ .getString("s4.comm.emitter.remote.class"));
+ install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+ RemoteEmitterFactory.class));
+
+ } catch (ClassNotFoundException e) {
+ logger.error("Cannot find class implementation ", e);
+ }
}
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(commConfigInputStream);
+
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 8133316..f0546a6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -120,8 +120,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
this.lock = new ReentrantLock();
// Initialize data structures
- int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
- destinationChannelMap = HashBiMap.create(clusterSize);
+ // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+ destinationChannelMap = HashBiMap.create();
// Initialize netty related structures
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -182,7 +182,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
if (!destinationChannelMap.containsKey(destination)) {
if (!connectTo((TCPDestination) destination)) {
- logger.warn("Could not connect to partition {}, discarding message", destination);
+ logger.warn("Could not connect to destination {}, discarding message", destination);
// Couldn't connect, discard message
return false;
}
@@ -308,10 +308,9 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
// cluster was changed
}
} else {
- metrics.sentMessage(destination);
-
+ //TODO:
+ //metrics.sentMessage(destination);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index c7673ae..b389a70 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -77,6 +77,7 @@ public class TCPListener implements Listener {
public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout, final Receiver receiver,
final DeserializerExecutorFactory deserializerExecutorFactory) {
// wait for an assignment
+ logger.info("Initializing tcplistener");
node = assignment.assignClusterNode();
nettyTimeout = timeout;
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -104,6 +105,8 @@ public class TCPListener implements Listener {
Channel c = bootstrap.bind(new InetSocketAddress(node.getPort()));
channels.add(c);
+ logger.info("Initialized tcplistener at "+ node);
+
}
@Override
@@ -155,6 +158,6 @@ public class TCPListener implements Listener {
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
}
-
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 639c95a..2ba5b9a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -38,12 +39,12 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.s4.base.Destination;
+import org.apache.s4.comm.tcp.TCPDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.name.Named;
-import com.sun.org.apache.bcel.internal.generic.NEW;
/**
* Represents a logical cluster definition fetched from Zookeeper. Notifies
@@ -61,7 +62,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
private final Lock lock;
private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
// Map of destination type to streamName to partitionId to Destination
- private final AtomicReference<Map<String, Map<String, Map<String, Destination>>>> destinationInfoMapRef;
+ private final AtomicReference<Map<String, Map<String, Map<Integer, Destination>>>> destinationInfoMapRef;
/**
* only the local topology
@@ -77,9 +78,9 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
this.clusterRef = new AtomicReference<PhysicalCluster>();
this.listeners = new ArrayList<ClusterChangeListener>();
- Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+ Map<String, Map<String, Map<Integer, Destination>>> destinationMap = Collections
.emptyMap();
- destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+ destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<Integer, Destination>>>>(
destinationMap);
lock = new ReentrantLock();
@@ -95,9 +96,9 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
this.clusterRef = new AtomicReference<PhysicalCluster>();
this.listeners = new ArrayList<ClusterChangeListener>();
- Map<String, Map<String, Map<String, Destination>>> destinationMap = Collections
+ Map<String, Map<String, Map<Integer, Destination>>> destinationMap = Collections
.emptyMap();
- destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<String, Destination>>>>(
+ destinationInfoMapRef = new AtomicReference<Map<String, Map<String, Map<Integer, Destination>>>>(
destinationMap);
lock = new ReentrantLock();
@@ -118,24 +119,78 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
Builder keyBuilder = helixDataAccessor.keyBuilder();
List<String> resources = helixDataAccessor.getChildNames(keyBuilder
.idealStates());
- Map<String, Integer> map = new HashMap<String, Integer>();
- Map<String, Map<String, Map<String, Destination>>> destinationRoutingMap;
- destinationRoutingMap = new HashMap<String, Map<String,Map<String,Destination>>>();
- for (String resource : resources) {
- String resourceType = configAccessor.get(
- builder.forCluster(clusterName).forResource(resource)
- .build(), "type");
- if ("Task".equalsIgnoreCase(resourceType)) {
- String streamName = configAccessor.get(
- builder.forCluster(clusterName)
- .forResource(resource).build(),
+ Map<String, Integer> partitionCountMap = new HashMap<String, Integer>();
+
+ // populate the destinationRoutingMap
+ Map<String, Map<String, Map<Integer, Destination>>> destinationRoutingMap;
+ destinationRoutingMap = new HashMap<String, Map<String, Map<Integer, Destination>>>();
+
+ List<InstanceConfig> configList = helixDataAccessor
+ .getChildValues(keyBuilder.instanceConfigs());
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+ Map<String, Destination> tcpDestinationMap = new HashMap<String, Destination>();
+
+ Map<String, Destination> udpDestinationMap = new HashMap<String, Destination>();
+
+ for (InstanceConfig config : configList) {
+ instanceConfigMap.put(config.getId(), config);
+ try {
+ int port = Integer.parseInt(config.getPort());
+ Destination destination = new TCPDestination(-1, port,
+ config.getHostName(), config.getId());
+ tcpDestinationMap.put(config.getId(), destination);
+ udpDestinationMap.put(config.getId(), destination);
+ } catch (NumberFormatException e) {
+ logger.error("Invalid port:" + config, e);
+ }
+ }
+ if (externalViewList != null) {
+ for (ExternalView extView : externalViewList) {
+ String resource = extView.getId();
+ ConfigScope resourceScope = builder.forCluster(clusterName)
+ .forResource(resource).build();
+ String resourceType = configAccessor.get(resourceScope,
+ "type");
+ if (!"Task".equalsIgnoreCase(resourceType)) {
+ continue;
+ }
+ String streamName = configAccessor.get(resourceScope,
"streamName");
IdealState idealstate = helixDataAccessor
.getProperty(keyBuilder.idealStates(resource));
- map.put(streamName, idealstate.getNumPartitions());
+ partitionCountMap.put(streamName,
+ idealstate.getNumPartitions());
+ for (String partitionName : extView.getPartitionSet()) {
+ Map<String, String> stateMap = extView
+ .getStateMap(partitionName);
+ for (String instanceName : stateMap.keySet()) {
+ String currentState = stateMap.get(instanceName);
+ if (!currentState.equals("LEADER")) {
+ continue;
+ }
+ if (instanceConfigMap.containsKey(instanceName)) {
+ InstanceConfig instanceConfig = instanceConfigMap
+ .get(instanceName);
+ String destinationType = "tcp";
+ addDestination(destinationRoutingMap,
+ streamName, partitionName,
+ "tcp", tcpDestinationMap
+ .get(instanceConfig.getId()));
+ addDestination(destinationRoutingMap,
+ streamName, partitionName,
+ "tcp", udpDestinationMap
+ .get(instanceConfig.getId()));
+ } else {
+ logger.error("Invalid instance name."
+ + instanceName
+ + " .Not found in /cluster/configs/. instanceName: ");
+ }
+ }
+ }
}
}
- partitionCountMapRef.set(map);
+ destinationInfoMapRef.set(destinationRoutingMap);
+ partitionCountMapRef.set(partitionCountMap);
for (ClusterChangeListener listener : listeners) {
listener.onChange();
@@ -149,6 +204,36 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
}
}
+ private void addDestination(
+ Map<String, Map<String, Map<Integer, Destination>>> destinationRoutingMap,
+ String streamName, String partitionName, String destinationType,
+ Destination destination) {
+ if (!destinationRoutingMap
+ .containsKey(destinationType)) {
+ destinationRoutingMap
+ .put(destinationType,
+ new HashMap<String, Map<Integer, Destination>>());
+ }
+ Map<String, Map<Integer, Destination>> typeMap = destinationRoutingMap
+ .get(destinationType);
+ if (!typeMap.containsKey(streamName)) {
+ typeMap.put(streamName,
+ new HashMap<Integer, Destination>());
+ }
+ Map<Integer, Destination> streamMap = typeMap
+ .get(streamName);
+ String[] split = partitionName.split("_");
+ if (split.length == 2) {
+ try {
+ int partitionId = Integer
+ .parseInt(split[1]);
+ streamMap.put(partitionId, destination);
+ } catch (NumberFormatException e) {
+
+ }
+ }
+ }
+
@Override
public PhysicalCluster getPhysicalCluster() {
return clusterRef.get();
@@ -196,11 +281,12 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
public Destination getDestination(String streamName, int partitionId,
String destinationType) {
- Map<String, Map<String, Destination>> typeMap = destinationInfoMapRef.get().get(destinationType);
+ Map<String, Map<Integer, Destination>> typeMap = destinationInfoMapRef
+ .get().get(destinationType);
if (typeMap == null)
return null;
- Map<String, Destination> streamMap = typeMap.get(streamName);
+ Map<Integer, Destination> streamMap = typeMap.get(streamName);
if (streamMap == null)
return null;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
index 41777cb..c1889ff 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
@@ -22,7 +22,7 @@ public class EmitterMetrics {
}
public void sentMessage(Destination destination) {
- //TODO
+ //TODO:
/*
Map<String, Meter> map = emittersMetersMap.get(stream);
if (map == null) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 30e44b2..a7407e3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -86,11 +86,8 @@ public class DefaultCoreModule extends AbstractModule {
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
-
- bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(
- AppStateModelFactory.class);
- bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+ bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
bind(S4RLoaderFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 01f086c..12ba248 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -19,19 +19,39 @@
package org.apache.s4.core;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.util.S4RLoaderFactory;
+import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.topology.HelixRemoteStreams;
import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZkRemoteStreams;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
+import org.apache.s4.core.staging.ThrottlingSenderExecutorServiceFactory;
import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
import org.apache.s4.deploy.HelixBasedDeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Files;
import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.name.Named;
+import com.google.inject.name.Names;
/**
* Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
@@ -41,18 +61,49 @@ import com.google.inject.name.Named;
public class HelixBasedCoreModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(HelixBasedCoreModule.class);
+ private InputStream coreConfigFileInputStream;
+ private PropertiesConfiguration config;
- public HelixBasedCoreModule() {
+ public HelixBasedCoreModule(InputStream coreConfigFileInputStream) {
+ this.coreConfigFileInputStream = coreConfigFileInputStream;
}
@Override
protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ if (coreConfigFileInputStream != null) {
+ try {
+ coreConfigFileInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+ bind(S4RLoaderFactory.class);
+
+ // For enabling checkpointing, one needs to use a custom module, such as
+ // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+ bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
+
+ // shed load in local sender only by default
+ bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
+ bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);
+
+ bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
+
bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
}
@Provides
@@ -65,5 +116,21 @@ public class HelixBasedCoreModule extends AbstractModule {
tmpS4Dir.getAbsolutePath());
return tmpS4Dir;
}
+ //TODO: move this to a base class
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(coreConfigFileInputStream);
+
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
index 3944971..5e5c40f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -1,7 +1,12 @@
package org.apache.s4.core;
+import java.io.InputStream;
import java.net.Inet4Address;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -13,17 +18,28 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.HelixBasedCommModule;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParametersInjectionModule;
import org.apache.s4.deploy.AppStateModelFactory;
+import org.apache.s4.deploy.DeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.Module;
import com.google.inject.name.Named;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Modules.OverriddenModuleBuilder;
/**
* This is the bootstrap for S4 nodes.
@@ -114,5 +130,63 @@ public class S4HelixBootstrap implements Bootstrap {
e.printStackTrace();
}
}
-
+ public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+ try {
+ Injector injector;
+ InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+ InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+
+ logger.info("Initializing S4 app with : {}", appConfig.toString());
+
+ AbstractModule commModule = new HelixBasedCommModule(commConfigFileInputStream);
+ AbstractModule coreModule = new HelixBasedCoreModule(coreConfigFileInputStream);
+
+ List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+ for (String moduleClass : appConfig.getCustomModulesNames()) {
+ extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+ }
+ Module combinedModule = Modules.combine(commModule, coreModule);
+ if (extraModules.size() > 0) {
+ OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+ combinedModule = overridenModuleBuilder.with(extraModules);
+ }
+
+ if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+
+ logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+ Map<String, String> namedParameters = new HashMap<String, String>();
+
+ namedParameters.putAll(appConfig.getNamedParameters());
+ combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+ }
+
+ if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
+ // In that case we won't be using an S4R classloader, app classes are available from the current
+ // classloader
+ // The app module provides bindings specific to the app class loader, in this case the current thread's
+ // class loader.
+ AppModule appModule = new AppModule(Thread.currentThread().getContextClassLoader());
+ // NOTE: because the app module can be overriden
+ combinedModule = Modules.override(appModule).with(combinedModule);
+ injector = parentInjector.createChildInjector(combinedModule);
+ logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+ App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+ app.init();
+ app.start();
+ } else {
+ injector = parentInjector.createChildInjector(combinedModule);
+ if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
+ logger.info("S4 node in standby until app class or app URI is specified");
+ }
+ Server server = injector.getInstance(Server.class);
+ server.setInjector(injector);
+ DeploymentManager deploymentManager = injector.getInstance(DeploymentManager.class);
+ deploymentManager.deploy(appConfig);
+ // server.start(injector);
+ }
+ } catch (Exception e) {
+ logger.error("Cannot start S4 node", e);
+ System.exit(1);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index b70c27f..427d999 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -99,7 +99,7 @@ public class AppStateModel extends StateModel {
@Override
public void run() {
// load app class through modules classloader and start it
- S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+ S4HelixBootstrap.startS4App(appConfig, parentInjector, modulesLoader);
// signalOneAppLoaded.countDown();
}
}, "S4 platform loader");
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b718c98/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index e640066..b16d4e0 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -43,6 +43,7 @@ public class GenericEventAdapter {
int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
Event event = new Event();
event.put("name", String.class, "Hello world to partition:" + partitionId);
+ event.setStreamId("names");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
KryoSerDeser serializer = new KryoSerDeser(classLoader);
// EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));