You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2012/12/20 19:06:49 UTC
[2/19] git commit: Renaming s4statemodel factory to
taskstatemodelfactory
Renaming s4statemodel factory to taskstatemodelfactory
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/008e5b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/008e5b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/008e5b0e
Branch: refs/heads/S4-110
Commit: 008e5b0e732101d76946345ffb7cf7d52ca561fb
Parents: 34dff89
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Dec 11 18:26:32 2012 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Dec 11 18:26:32 2012 -0800
----------------------------------------------------------------------
.../org/apache/s4/comm/HelixBasedCommModule.java | 4 +-
.../apache/s4/comm/helix/S4StateModelFactory.java | 13 -
.../s4/comm/helix/TaskStateModelFactory.java | 13 +
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 392 +++++++-------
.../java/org/apache/s4/comm/tcp/TCPListener.java | 3 +-
.../s4/comm/topology/AssignmentFromHelix.java | 4 +-
.../java/org/apache/s4/core/DefaultCoreModule.java | 2 +-
.../src/main/java/org/apache/s4/core/Server.java | 4 +-
8 files changed, 217 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index 1a979a8..c9cfcb7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -13,7 +13,7 @@ import org.apache.s4.base.Hasher;
import org.apache.s4.base.Listener;
import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Assignment;
@@ -74,7 +74,7 @@ public class HelixBasedCommModule extends AbstractModule{
// a node holds a single partition assignment
// ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
- bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(S4StateModelFactory.class);
+ bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(TaskStateModelFactory.class);
bind(Assignment.class).to(AssignmentFromHelix.class);
bind(Cluster.class).to(ClusterFromHelix.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java
deleted file mode 100644
index c823bd7..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModelFactory.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.s4.comm.helix;
-
-import org.apache.helix.participant.statemachine.StateModelFactory;
-
-
-public class S4StateModelFactory extends StateModelFactory<S4StateModel>{
-
- @Override
- public S4StateModel createNewStateModel(String partitionName) {
- return new S4StateModel(partitionName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
new file mode 100644
index 0000000..eef64ea
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.comm.helix;
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+
+public class TaskStateModelFactory extends StateModelFactory<S4StateModel>{
+
+ @Override
+ public S4StateModel createNewStateModel(String partitionName) {
+ return new S4StateModel(partitionName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5e35588..a0e5002 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -65,203 +65,203 @@ import org.apache.helix.model.InstanceConfig;
*/
public class TCPEmitter implements Emitter, ClusterChangeListener {
- private static final Logger logger = LoggerFactory
- .getLogger(TCPEmitter.class);
-
- private final int nettyTimeout;
-
- private Cluster topology;
- private final ClientBootstrap bootstrap;
-
- /*
- * All channels
- */
- private final ChannelGroup channels = new DefaultChannelGroup();
-
- /*
- * Channel used to send messages to each Node
- */
- private final BiMap<InstanceConfig, Channel> nodeChannelMap;
-
- // lock for synchronizing between cluster updates callbacks and other code
- private final Lock lock;
-
- @Inject
- SerializerDeserializer serDeser= new KryoSerDeser();
-
- @Inject
- public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
- throws InterruptedException {
- this.nettyTimeout = timeout;
- this.topology = topology;
- this.lock = new ReentrantLock();
-
- // Initialize data structures
- //int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
- // TODO cluster can grow in size
- nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
- .<InstanceConfig, Channel> create());
-
- // Initialize netty related structures
- ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
- bootstrap = new ClientBootstrap(factory);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("1", new LengthFieldPrepender(4));
- p.addLast("2", new ExceptionHandler());
- return p;
- }
- });
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
- }
-
- @Inject
- private void init() {
- this.topology.addListener(this);
- }
-
- private boolean connectTo(InstanceConfig config) {
-
- if (config == null) {
-
- logger.error("Invalid clusterNode");
- return false;
- }
-
- try {
- ChannelFuture connectFuture = this.bootstrap
- .connect(new InetSocketAddress(config.getHostName(),
- Integer.parseInt(config.getPort())));
- connectFuture.await();
- if (connectFuture.isSuccess()) {
- channels.add(connectFuture.getChannel());
- nodeChannelMap
- .forcePut(config, connectFuture.getChannel());
- return true;
- }
- } catch (InterruptedException ie) {
- logger.error(String.format("Interrupted while connecting to %s:%d",
- config.getHostName(), config.getPort()));
- Thread.currentThread().interrupt();
- }
- return false;
- }
-
- private void sendMessage(String streamName, int partitionId, byte[] message) {
- ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
- buffer.writeBytes(message);
- InstanceConfig config = topology
- .getDestination(streamName, partitionId);
- if (!nodeChannelMap.containsKey(config)) {
- if (!connectTo(config)) {
- // Couldn't connect, discard message
- return;
- }
- }
-
- Channel c = nodeChannelMap.get(config);
- if (c == null)
- return;
-
- c.write(buffer).addListener(new MessageSendingListener(partitionId));
- }
-
- @Override
- public boolean send(int partitionId, EventMessage message) {
- sendMessage(message.getStreamName(), partitionId,
- serDeser.serialize(message));
- return true;
- }
-
- protected void removeChannel(int partition) {
- Channel c = nodeChannelMap.remove(partition);
- if (c == null) {
- return;
- }
- c.close().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (future.isSuccess())
- channels.remove(future.getChannel());
- else
- logger.error("Failed to close channel");
- }
- });
- }
-
- public void close() {
- try {
- channels.close().await();
- bootstrap.releaseExternalResources();
- } catch (InterruptedException ie) {
- logger.error("Interrupted while closing");
- Thread.currentThread().interrupt();
- }
- }
-
- //@Override
- public int getPartitionCount() {
- return topology.getPhysicalCluster().getPartitionCount();
- }
- public int getPartitionCount(String streamName) {
+ private static final Logger logger = LoggerFactory
+ .getLogger(TCPEmitter.class);
+
+ private final int nettyTimeout;
+
+ private Cluster topology;
+ private final ClientBootstrap bootstrap;
+
+ /*
+ * All channels
+ */
+ private final ChannelGroup channels = new DefaultChannelGroup();
+
+ /*
+ * Channel used to send messages to each Node
+ */
+ private final BiMap<InstanceConfig, Channel> nodeChannelMap;
+
+ // lock for synchronizing between cluster updates callbacks and other code
+ private final Lock lock;
+
+ @Inject
+ SerializerDeserializer serDeser= new KryoSerDeser();
+
+ @Inject
+ public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
+ throws InterruptedException {
+ this.nettyTimeout = timeout;
+ this.topology = topology;
+ this.lock = new ReentrantLock();
+
+ // Initialize data structures
+ //int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+ // TODO cluster can grow in size
+ nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
+ .<InstanceConfig, Channel> create());
+
+ // Initialize netty related structures
+ ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("1", new LengthFieldPrepender(4));
+ p.addLast("2", new ExceptionHandler());
+ return p;
+ }
+ });
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+ }
+
+ @Inject
+ private void init() {
+ this.topology.addListener(this);
+ }
+
+ private boolean connectTo(InstanceConfig config) {
+
+ if (config == null) {
+
+ logger.error("Invalid clusterNode");
+ return false;
+ }
+
+ try {
+ ChannelFuture connectFuture = this.bootstrap
+ .connect(new InetSocketAddress(config.getHostName(),
+ Integer.parseInt(config.getPort())));
+ connectFuture.await();
+ if (connectFuture.isSuccess()) {
+ channels.add(connectFuture.getChannel());
+ nodeChannelMap
+ .forcePut(config, connectFuture.getChannel());
+ return true;
+ }
+ } catch (InterruptedException ie) {
+ logger.error(String.format("Interrupted while connecting to %s:%d",
+ config.getHostName(), config.getPort()));
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ private void sendMessage(String streamName, int partitionId, byte[] message) {
+ ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+ buffer.writeBytes(message);
+ InstanceConfig config = topology
+ .getDestination(streamName, partitionId);
+ if (!nodeChannelMap.containsKey(config)) {
+ if (!connectTo(config)) {
+ // Couldn't connect, discard message
+ return;
+ }
+ }
+
+ Channel c = nodeChannelMap.get(config);
+ if (c == null)
+ return;
+
+ c.write(buffer).addListener(new MessageSendingListener(partitionId));
+ }
+
+ @Override
+ public boolean send(int partitionId, EventMessage message) {
+ sendMessage(message.getStreamName(), partitionId,
+ serDeser.serialize(message));
+ return true;
+ }
+
+ protected void removeChannel(int partition) {
+ Channel c = nodeChannelMap.remove(partition);
+ if (c == null) {
+ return;
+ }
+ c.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (future.isSuccess())
+ channels.remove(future.getChannel());
+ else
+ logger.error("Failed to close channel");
+ }
+ });
+ }
+
+ public void close() {
+ try {
+ channels.close().await();
+ bootstrap.releaseExternalResources();
+ } catch (InterruptedException ie) {
+ logger.error("Interrupted while closing");
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ //@Override
+ public int getPartitionCount() {
+ return topology.getPhysicalCluster().getPartitionCount();
+ }
+ public int getPartitionCount(String streamName) {
return topology.getPhysicalCluster().getPartitionCount();
}
- class ExceptionHandler extends SimpleChannelUpstreamHandler {
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Throwable t = e.getCause();
- if (t instanceof ClosedChannelException) {
- nodeChannelMap.inverse().remove(e.getChannel());
- return;
- } else if (t instanceof ConnectException) {
- nodeChannelMap.inverse().remove(e.getChannel());
- return;
- } else {
- logger.error("Unexpected exception", t);
- }
- }
- }
-
- class MessageSendingListener implements ChannelFutureListener {
-
- int partitionId = -1;
-
- public MessageSendingListener(int partitionId) {
- super();
- this.partitionId = partitionId;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- try {
- // TODO handle possible cluster reconfiguration between send
- // and failure callback
- logger.warn(
- "Failed to send message to node {} (according to current cluster information)",
- topology.getPhysicalCluster().getNodes()
- .get(partitionId));
- } catch (IndexOutOfBoundsException ignored) {
- // cluster was changed
- }
- }
-
- }
- }
-
- @Override
- public void onChange() {
-
- }
+ class ExceptionHandler extends SimpleChannelUpstreamHandler {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Throwable t = e.getCause();
+ if (t instanceof ClosedChannelException) {
+ nodeChannelMap.inverse().remove(e.getChannel());
+ return;
+ } else if (t instanceof ConnectException) {
+ nodeChannelMap.inverse().remove(e.getChannel());
+ return;
+ } else {
+ logger.error("Unexpected exception", t);
+ }
+ }
+ }
+
+ class MessageSendingListener implements ChannelFutureListener {
+
+ int partitionId = -1;
+
+ public MessageSendingListener(int partitionId) {
+ super();
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ try {
+ // TODO handle possible cluster reconfiguration between send
+ // and failure callback
+ logger.warn(
+ "Failed to send message to node {} (according to current cluster information)",
+ topology.getPhysicalCluster().getNodes()
+ .get(partitionId));
+ } catch (IndexOutOfBoundsException ignored) {
+ // cluster was changed
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public void onChange() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 7259079..d495033 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -64,8 +64,7 @@ public class TCPListener implements Listener {
@Inject
public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout) {
-System.out.println("TCPListener.TCPListener()");
- // wait for an assignment
+ // wait for an assignment
node = assignment.assignClusterNode();
nettyTimeout = timeout;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 7b39701..7e0b4ac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -26,7 +26,7 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +60,7 @@ public class AssignmentFromHelix implements Assignment
@Named("s4.cluster.zk_address") String zookeeperAddress
) throws Exception
{
- this.taskStateModelFactory = new S4StateModelFactory();
+ this.taskStateModelFactory = new TaskStateModelFactory();
// this.appStateModelFactory = appStateModelFactory;
this.clusterName = clusterName;
this.zookeeperAddress = zookeeperAddress;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index a284aad..97ec9af 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -30,7 +30,7 @@ import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreamsManager;
import org.apache.s4.comm.topology.RemoteStreamsManagerImpl;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/008e5b0e/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 666b7e7..bc172a5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -31,7 +31,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.s4.base.util.S4RLoader;
import org.apache.s4.base.util.S4RLoaderFactory;
-import org.apache.s4.comm.helix.S4StateModelFactory;
+import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromHelix;
import org.apache.s4.comm.topology.AssignmentFromZK;
@@ -77,7 +77,7 @@ public class Server {
private final String instanceName;
@Inject
- private S4StateModelFactory taskStateModelFactory;
+ private TaskStateModelFactory taskStateModelFactory;
@Inject
private AppStateModelFactory appStateModelFactory;