You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[1/50] [abbrv] git commit: renamed netty to tcp
Updated Branches:
refs/heads/piper [created] 8b642d182
renamed netty to tcp
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/8b642d18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/8b642d18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/8b642d18
Branch: refs/heads/piper
Commit: 8b642d182862556539e3df4610307fdb20d66c0a
Parents: e228a8a
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Tue Dec 20 15:08:12 2011 -0500
Committer: Matthieu Morel <mm...@apache.org>
Committed: Wed Dec 21 11:12:15 2011 +0100
----------------------------------------------------------------------
.../org/apache/s4/comm/netty/NettyEmitter.java | 286 ---------------
.../org/apache/s4/comm/netty/NettyListener.java | 100 -----
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 286 +++++++++++++++
.../java/org/apache/s4/comm/tcp/TCPListener.java | 100 +++++
.../test/java/org/apache/s4/comm/TCPCommTest.java | 8 +-
.../ZkBasedClusterManagementTestModule.java | 11 +-
6 files changed, 394 insertions(+), 397 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
deleted file mode 100644
index 15ff77d..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
+++ /dev/null
@@ -1,286 +0,0 @@
-package org.apache.s4.comm.netty;
-
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
-import java.util.Hashtable;
-import java.util.Queue;
-import java.util.concurrent.Executors;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyChangeListener;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.HashBiMap;
-import com.google.inject.Inject;
-
-public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
- private static final Logger logger = LoggerFactory.getLogger(NettyEmitter.class);
- private static final int BUFFER_SIZE = 10;
- private static final int NUM_RETRIES = 10;
-
- private Topology topology;
- private final ClientBootstrap bootstrap;
-
- static class MessageQueuesPerPartition {
- private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
- private boolean bounded;
-
- MessageQueuesPerPartition(boolean bounded) {
- this.bounded = bounded;
- }
-
- private boolean add(int partitionId, byte[] message) {
- Queue<byte[]> messages = queues.get(partitionId);
-
- if (messages == null) {
- messages = new ArrayDeque<byte[]>();
- queues.put(partitionId, messages);
- }
-
- if (bounded && messages.size() >= BUFFER_SIZE) {
- // Too many messages already queued
- return false;
- }
-
- messages.offer(message);
- return true;
- }
-
- private byte[] peek(int partitionId) {
- Queue<byte[]> messages = queues.get(partitionId);
-
- try {
- return messages.peek();
- } catch (NullPointerException npe) {
- return null;
- }
- }
-
- private void remove(int partitionId) {
- Queue<byte[]> messages = queues.get(partitionId);
-
- if (messages.isEmpty()) {
- logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
- return;
- }
-
- if (messages != null)
- messages.remove();
- }
- }
-
- private HashBiMap<Integer, Channel> partitionChannelMap;
- private HashBiMap<Integer, ClusterNode> partitionNodeMap;
- private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
-
- @Inject
- public NettyEmitter(Topology topology) throws InterruptedException {
- this.topology = topology;
- topology.addListener(this);
- int clusterSize = this.topology.getTopology().getNodes().size();
-
- partitionChannelMap = HashBiMap.create(clusterSize);
- partitionNodeMap = HashBiMap.create(clusterSize);
-
- ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
-
- bootstrap = new ClientBootstrap(factory);
-
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("1", new LengthFieldPrepender(4));
- p.addLast("2", new TestHandler());
- return p;
- }
- });
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- }
-
- private boolean connectTo(Integer partitionId) {
- ClusterNode clusterNode = partitionNodeMap.get(partitionId);
-
- if (clusterNode == null) {
- clusterNode = topology.getTopology().getNodes().get(partitionId);
- partitionNodeMap.forcePut(partitionId, clusterNode);
- }
-
- if (clusterNode == null) {
- logger.error("No ClusterNode exists for partitionId " + partitionId);
- return false;
- }
-
- for (int retries = 0; retries < NUM_RETRIES; retries++) {
- ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
- .getPort()));
- f.awaitUninterruptibly();
- if (f.isSuccess()) {
- partitionChannelMap.forcePut(partitionId, f.getChannel());
- return true;
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
- clusterNode.getPort()));
- }
- }
-
- return false;
- }
-
- private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
- ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
- buffer.writeBytes(message);
- ChannelFuture f = channel.write(buffer);
- f.addListener(this);
- }
-
- private final Object sendLock = new Object();
-
- @Override
- public boolean send(int partitionId, byte[] message) {
- Channel channel = partitionChannelMap.get(partitionId);
- if (channel == null) {
- if (connectTo(partitionId)) {
- channel = partitionChannelMap.get(partitionId);
- } else {
- // could not connect, queue to the partitionBuffer
- return queuedMessages.add(partitionId, message);
- }
- }
-
- /*
- * Try limiting the size of the send queue inside Netty
- */
- if (!channel.isWritable()) {
- synchronized (sendLock) {
- // check again now that we have the lock
- while (!channel.isWritable()) {
- try {
- sendLock.wait();
- } catch (InterruptedException ie) {
- return false;
- }
- }
- }
- }
-
- /*
- * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
- * buffered messages, and (3) the Current Message
- *
- * Once the channel returns success delete from the messagesOnTheWire
- */
- byte[] messageBeingSent = null;
- // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
- // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
- // }
-
- while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
- writeMessageToChannel(channel, partitionId, messageBeingSent);
- queuedMessages.remove(partitionId);
- }
-
- writeMessageToChannel(channel, partitionId, message);
- return true;
- }
-
- @Override
- public void operationComplete(ChannelFuture f) {
- int partitionId = partitionChannelMap.inverse().get(f.getChannel());
- if (f.isSuccess()) {
- // messagesOnTheWire.remove(partitionId);
- }
-
- if (f.isCancelled()) {
- logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
- } else if (!f.isSuccess()) {
- logger.error("Exception on I/O operation", f.getCause());
- logger.error(String.format("I/O on partition %d failed!", partitionId));
- partitionChannelMap.remove(partitionId);
- }
- }
-
- @Override
- public void onChange() {
- /*
- * Close the channels that correspond to changed partitions and update partitionNodeMap
- */
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
- Integer partition = clusterNode.getPartition();
- ClusterNode oldNode = partitionNodeMap.get(partition);
-
- if (oldNode != null && !oldNode.equals(clusterNode)) {
- partitionChannelMap.remove(partition).close();
- }
-
- partitionNodeMap.forcePut(partition, clusterNode);
- }
- }
-
- @Override
- public int getPartitionCount() {
- // Number of nodes is not same as number of partitions
- return topology.getTopology().getPartitionCount();
- }
-
- class TestHandler extends SimpleChannelHandler {
- @Override
- public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
- // logger.info(String.format("%08x %08x %08x", e.getValue(),
- // e.getChannel().getInterestOps(), Channel.OP_WRITE));
- synchronized (sendLock) {
- if (e.getChannel().isWritable()) {
- sendLock.notify();
- }
- }
- ctx.sendUpstream(e);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
- Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
- if (partitionId == null) {
- logger.error("Error on mystery channel!!");
- }
- logger.error("Error on channel to partition " + partitionId);
-
- try {
- throw event.getCause();
- } catch (ConnectException ce) {
- logger.error(ce.getMessage(), ce);
- } catch (Throwable err) {
- logger.error("Error", err);
- if (context.getChannel().isOpen()) {
- logger.error("Closing channel due to exception");
- context.getChannel().close();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java
deleted file mode 100644
index 3abeb3c..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.s4.comm.netty;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.s4.base.Listener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-
-public class NettyListener implements Listener {
- private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
- private ClusterNode node;
- private static final Logger logger = LoggerFactory.getLogger(NettyListener.class);
-
- @Inject
- public NettyListener(Assignment assignment) {
- // wait for an assignment
- node = assignment.assignClusterNode();
-
- ChannelFactory factory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
-
- ServerBootstrap bootstrap = new ServerBootstrap(factory);
-
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
- p.addLast("2", new ChannelHandler(handoffQueue));
-
- return p;
- }
- });
-
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
-
- bootstrap.bind(new InetSocketAddress(node.getPort()));
- }
-
- public byte[] recv() {
- try {
- return handoffQueue.take();
- } catch (InterruptedException e) {
- return null;
- }
- }
-
- public int getPartitionId() {
- return node.getPartition();
- }
-
- public class ChannelHandler extends SimpleChannelHandler {
- private BlockingQueue<byte[]> handoffQueue;
-
- public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
- this.handoffQueue = handOffQueue;
- }
-
- public void messageReceived(ChannelHandlerContext ctx,
- MessageEvent e) {
- ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
- try {
- handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
- // there's no receiver at the other end of the handoff queue
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
-
- public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
- logger.error("Error", event.getCause());
- if (context.getChannel().isOpen()) {
- logger.error("Closing channel due to exception");
- context.getChannel().close();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
new file mode 100644
index 0000000..5960fc1
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -0,0 +1,286 @@
+package org.apache.s4.comm.tcp;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Hashtable;
+import java.util.Queue;
+import java.util.concurrent.Executors;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashBiMap;
+import com.google.inject.Inject;
+
+public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
+ private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
+ private static final int BUFFER_SIZE = 10;
+ private static final int NUM_RETRIES = 10;
+
+ private Topology topology;
+ private final ClientBootstrap bootstrap;
+
+ static class MessageQueuesPerPartition {
+ private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
+ private boolean bounded;
+
+ MessageQueuesPerPartition(boolean bounded) {
+ this.bounded = bounded;
+ }
+
+ private boolean add(int partitionId, byte[] message) {
+ Queue<byte[]> messages = queues.get(partitionId);
+
+ if (messages == null) {
+ messages = new ArrayDeque<byte[]>();
+ queues.put(partitionId, messages);
+ }
+
+ if (bounded && messages.size() >= BUFFER_SIZE) {
+ // Too many messages already queued
+ return false;
+ }
+
+ messages.offer(message);
+ return true;
+ }
+
+ private byte[] peek(int partitionId) {
+ Queue<byte[]> messages = queues.get(partitionId);
+
+ try {
+ return messages.peek();
+ } catch (NullPointerException npe) {
+ return null;
+ }
+ }
+
+ private void remove(int partitionId) {
+ Queue<byte[]> messages = queues.get(partitionId);
+
+ if (messages.isEmpty()) {
+ logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
+ return;
+ }
+
+ if (messages != null)
+ messages.remove();
+ }
+ }
+
+ private HashBiMap<Integer, Channel> partitionChannelMap;
+ private HashBiMap<Integer, ClusterNode> partitionNodeMap;
+ private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+
+ @Inject
+ public TCPEmitter(Topology topology) throws InterruptedException {
+ this.topology = topology;
+ topology.addListener(this);
+ int clusterSize = this.topology.getTopology().getNodes().size();
+
+ partitionChannelMap = HashBiMap.create(clusterSize);
+ partitionNodeMap = HashBiMap.create(clusterSize);
+
+ ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+
+ bootstrap = new ClientBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("1", new LengthFieldPrepender(4));
+ p.addLast("2", new TestHandler());
+ return p;
+ }
+ });
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ }
+
+ private boolean connectTo(Integer partitionId) {
+ ClusterNode clusterNode = partitionNodeMap.get(partitionId);
+
+ if (clusterNode == null) {
+ clusterNode = topology.getTopology().getNodes().get(partitionId);
+ partitionNodeMap.forcePut(partitionId, clusterNode);
+ }
+
+ if (clusterNode == null) {
+ logger.error("No ClusterNode exists for partitionId " + partitionId);
+ return false;
+ }
+
+ for (int retries = 0; retries < NUM_RETRIES; retries++) {
+ ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
+ .getPort()));
+ f.awaitUninterruptibly();
+ if (f.isSuccess()) {
+ partitionChannelMap.forcePut(partitionId, f.getChannel());
+ return true;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
+ clusterNode.getPort()));
+ }
+ }
+
+ return false;
+ }
+
+ private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
+ ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+ buffer.writeBytes(message);
+ ChannelFuture f = channel.write(buffer);
+ f.addListener(this);
+ }
+
+ private final Object sendLock = new Object();
+
+ @Override
+ public boolean send(int partitionId, byte[] message) {
+ Channel channel = partitionChannelMap.get(partitionId);
+ if (channel == null) {
+ if (connectTo(partitionId)) {
+ channel = partitionChannelMap.get(partitionId);
+ } else {
+ // could not connect, queue to the partitionBuffer
+ return queuedMessages.add(partitionId, message);
+ }
+ }
+
+ /*
+ * Try limiting the size of the send queue inside Netty
+ */
+ if (!channel.isWritable()) {
+ synchronized (sendLock) {
+ // check again now that we have the lock
+ while (!channel.isWritable()) {
+ try {
+ sendLock.wait();
+ } catch (InterruptedException ie) {
+ return false;
+ }
+ }
+ }
+ }
+
+ /*
+ * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
+ * buffered messages, and (3) the Current Message
+ *
+ * Once the channel returns success delete from the messagesOnTheWire
+ */
+ byte[] messageBeingSent = null;
+ // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
+ // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+ // }
+
+ while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
+ writeMessageToChannel(channel, partitionId, messageBeingSent);
+ queuedMessages.remove(partitionId);
+ }
+
+ writeMessageToChannel(channel, partitionId, message);
+ return true;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture f) {
+ int partitionId = partitionChannelMap.inverse().get(f.getChannel());
+ if (f.isSuccess()) {
+ // messagesOnTheWire.remove(partitionId);
+ }
+
+ if (f.isCancelled()) {
+ logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
+ } else if (!f.isSuccess()) {
+ logger.error("Exception on I/O operation", f.getCause());
+ logger.error(String.format("I/O on partition %d failed!", partitionId));
+ partitionChannelMap.remove(partitionId);
+ }
+ }
+
+ @Override
+ public void onChange() {
+ /*
+ * Close the channels that correspond to changed partitions and update partitionNodeMap
+ */
+ for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+ Integer partition = clusterNode.getPartition();
+ ClusterNode oldNode = partitionNodeMap.get(partition);
+
+ if (oldNode != null && !oldNode.equals(clusterNode)) {
+ partitionChannelMap.remove(partition).close();
+ }
+
+ partitionNodeMap.forcePut(partition, clusterNode);
+ }
+ }
+
+ @Override
+ public int getPartitionCount() {
+ // Number of nodes is not same as number of partitions
+ return topology.getTopology().getPartitionCount();
+ }
+
+ class TestHandler extends SimpleChannelHandler {
+ @Override
+ public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ // logger.info(String.format("%08x %08x %08x", e.getValue(),
+ // e.getChannel().getInterestOps(), Channel.OP_WRITE));
+ synchronized (sendLock) {
+ if (e.getChannel().isWritable()) {
+ sendLock.notify();
+ }
+ }
+ ctx.sendUpstream(e);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
+ Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
+ if (partitionId == null) {
+ logger.error("Error on mystery channel!!");
+ }
+ logger.error("Error on channel to partition " + partitionId);
+
+ try {
+ throw event.getCause();
+ } catch (ConnectException ce) {
+ logger.error(ce.getMessage(), ce);
+ } catch (Throwable err) {
+ logger.error("Error", err);
+ if (context.getChannel().isOpen()) {
+ logger.error("Closing channel due to exception");
+ context.getChannel().close();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
new file mode 100644
index 0000000..b3776e1
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -0,0 +1,100 @@
+package org.apache.s4.comm.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.s4.base.Listener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+
+public class TCPListener implements Listener {
+ private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
+ private ClusterNode node;
+ private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
+
+ @Inject
+ public TCPListener(Assignment assignment) {
+ // wait for an assignment
+ node = assignment.assignClusterNode();
+
+ ChannelFactory factory =
+ new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+
+ ServerBootstrap bootstrap = new ServerBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
+ p.addLast("2", new ChannelHandler(handoffQueue));
+
+ return p;
+ }
+ });
+
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+
+ bootstrap.bind(new InetSocketAddress(node.getPort()));
+ }
+
+ public byte[] recv() {
+ try {
+ return handoffQueue.take();
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ public int getPartitionId() {
+ return node.getPartition();
+ }
+
+ public class ChannelHandler extends SimpleChannelHandler {
+ private BlockingQueue<byte[]> handoffQueue;
+
+ public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
+ this.handoffQueue = handOffQueue;
+ }
+
+ public void messageReceived(ChannelHandlerContext ctx,
+ MessageEvent e) {
+ ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ try {
+ handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
+ // there's no receiver at the other end of the handoff queue
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
+ logger.error("Error", event.getCause());
+ if (context.getChannel().isOpen()) {
+ logger.error("Closing channel due to exception");
+ context.getChannel().close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
index b539932..b2ee5e1 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
@@ -11,13 +11,13 @@ import org.apache.s4.base.Hasher;
import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromFile;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
@@ -68,8 +68,8 @@ public class TCPCommTest extends SimpleDeliveryTest {
bind(Topology.class).to(TopologyFromFile.class);
/* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(NettyListener.class);
- bind(Emitter.class).to(NettyEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
+ bind(Emitter.class).to(TCPEmitter.class);
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index b3ccf10..f5959a2 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -13,15 +13,13 @@ import org.apache.s4.base.Hasher;
import org.apache.s4.base.Listener;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
import org.apache.s4.comm.topology.TopologyFromZK;
import org.apache.s4.comm.udp.UDPEmitter;
import org.apache.s4.comm.udp.UDPListener;
@@ -74,9 +72,8 @@ public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
bind(Assignment.class).to(AssignmentFromZK.class);
bind(Topology.class).to(TopologyFromZK.class);
- bind(Emitter.class).to(NettyEmitter.class);
- bind(Listener.class).to(NettyListener.class);
-
+ bind(Emitter.class).to(TCPEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
}
}