You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC

[1/50] [abbrv] git commit: renamed netty to tcp

Updated Branches:
  refs/heads/piper [created] 8b642d182


renamed netty to tcp


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/8b642d18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/8b642d18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/8b642d18

Branch: refs/heads/piper
Commit: 8b642d182862556539e3df4610307fdb20d66c0a
Parents: e228a8a
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Tue Dec 20 15:08:12 2011 -0500
Committer: Matthieu Morel <mm...@apache.org>
Committed: Wed Dec 21 11:12:15 2011 +0100

----------------------------------------------------------------------
 .../org/apache/s4/comm/netty/NettyEmitter.java     |  286 ---------------
 .../org/apache/s4/comm/netty/NettyListener.java    |  100 -----
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |  286 +++++++++++++++
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |  100 +++++
 .../test/java/org/apache/s4/comm/TCPCommTest.java  |    8 +-
 .../ZkBasedClusterManagementTestModule.java        |   11 +-
 6 files changed, 394 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
deleted file mode 100644
index 15ff77d..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyEmitter.java
+++ /dev/null
@@ -1,286 +0,0 @@
-package org.apache.s4.comm.netty;
-
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
-import java.util.Hashtable;
-import java.util.Queue;
-import java.util.concurrent.Executors;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyChangeListener;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.HashBiMap;
-import com.google.inject.Inject;
-
-public class NettyEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
-    private static final Logger logger = LoggerFactory.getLogger(NettyEmitter.class);
-    private static final int BUFFER_SIZE = 10;
-    private static final int NUM_RETRIES = 10;
-
-    private Topology topology;
-    private final ClientBootstrap bootstrap;
-
-    static class MessageQueuesPerPartition {
-        private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
-        private boolean bounded;
-
-        MessageQueuesPerPartition(boolean bounded) {
-            this.bounded = bounded;
-        }
-
-        private boolean add(int partitionId, byte[] message) {
-            Queue<byte[]> messages = queues.get(partitionId);
-
-            if (messages == null) {
-                messages = new ArrayDeque<byte[]>();
-                queues.put(partitionId, messages);
-            }
-
-            if (bounded && messages.size() >= BUFFER_SIZE) {
-                // Too many messages already queued
-                return false;
-            }
-
-            messages.offer(message);
-            return true;
-        }
-
-        private byte[] peek(int partitionId) {
-            Queue<byte[]> messages = queues.get(partitionId);
-
-            try {
-                return messages.peek();
-            } catch (NullPointerException npe) {
-                return null;
-            }
-        }
-
-        private void remove(int partitionId) {
-            Queue<byte[]> messages = queues.get(partitionId);
-
-            if (messages.isEmpty()) {
-                logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
-                return;
-            }
-
-            if (messages != null)
-                messages.remove();
-        }
-    }
-
-    private HashBiMap<Integer, Channel> partitionChannelMap;
-    private HashBiMap<Integer, ClusterNode> partitionNodeMap;
-    private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
-
-    @Inject
-    public NettyEmitter(Topology topology) throws InterruptedException {
-        this.topology = topology;
-        topology.addListener(this);
-        int clusterSize = this.topology.getTopology().getNodes().size();
-
-        partitionChannelMap = HashBiMap.create(clusterSize);
-        partitionNodeMap = HashBiMap.create(clusterSize);
-
-        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                Executors.newCachedThreadPool());
-
-        bootstrap = new ClientBootstrap(factory);
-
-        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-            @Override
-            public ChannelPipeline getPipeline() {
-                ChannelPipeline p = Channels.pipeline();
-                p.addLast("1", new LengthFieldPrepender(4));
-                p.addLast("2", new TestHandler());
-                return p;
-            }
-        });
-
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-    }
-
-    private boolean connectTo(Integer partitionId) {
-        ClusterNode clusterNode = partitionNodeMap.get(partitionId);
-
-        if (clusterNode == null) {
-            clusterNode = topology.getTopology().getNodes().get(partitionId);
-            partitionNodeMap.forcePut(partitionId, clusterNode);
-        }
-
-        if (clusterNode == null) {
-            logger.error("No ClusterNode exists for partitionId " + partitionId);
-            return false;
-        }
-
-        for (int retries = 0; retries < NUM_RETRIES; retries++) {
-            ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
-                    .getPort()));
-            f.awaitUninterruptibly();
-            if (f.isSuccess()) {
-                partitionChannelMap.forcePut(partitionId, f.getChannel());
-                return true;
-            }
-            try {
-                Thread.sleep(10);
-            } catch (InterruptedException ie) {
-                logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
-                        clusterNode.getPort()));
-            }
-        }
-
-        return false;
-    }
-
-    private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
-        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
-        buffer.writeBytes(message);
-        ChannelFuture f = channel.write(buffer);
-        f.addListener(this);
-    }
-
-    private final Object sendLock = new Object();
-
-    @Override
-    public boolean send(int partitionId, byte[] message) {
-        Channel channel = partitionChannelMap.get(partitionId);
-        if (channel == null) {
-            if (connectTo(partitionId)) {
-                channel = partitionChannelMap.get(partitionId);
-            } else {
-                // could not connect, queue to the partitionBuffer
-                return queuedMessages.add(partitionId, message);
-            }
-        }
-
-        /*
-         * Try limiting the size of the send queue inside Netty
-         */
-        if (!channel.isWritable()) {
-            synchronized (sendLock) {
-                // check again now that we have the lock
-                while (!channel.isWritable()) {
-                    try {
-                        sendLock.wait();
-                    } catch (InterruptedException ie) {
-                        return false;
-                    }
-                }
-            }
-        }
-
-        /*
-         * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
-         * buffered messages, and (3) the Current Message
-         * 
-         * Once the channel returns success delete from the messagesOnTheWire
-         */
-        byte[] messageBeingSent = null;
-        // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
-        // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
-        // }
-
-        while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
-            writeMessageToChannel(channel, partitionId, messageBeingSent);
-            queuedMessages.remove(partitionId);
-        }
-
-        writeMessageToChannel(channel, partitionId, message);
-        return true;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture f) {
-        int partitionId = partitionChannelMap.inverse().get(f.getChannel());
-        if (f.isSuccess()) {
-            // messagesOnTheWire.remove(partitionId);
-        }
-
-        if (f.isCancelled()) {
-            logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
-        } else if (!f.isSuccess()) {
-            logger.error("Exception on I/O operation", f.getCause());
-            logger.error(String.format("I/O on partition %d failed!", partitionId));
-            partitionChannelMap.remove(partitionId);
-        }
-    }
-
-    @Override
-    public void onChange() {
-        /*
-         * Close the channels that correspond to changed partitions and update partitionNodeMap
-         */
-        for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
-            Integer partition = clusterNode.getPartition();
-            ClusterNode oldNode = partitionNodeMap.get(partition);
-
-            if (oldNode != null && !oldNode.equals(clusterNode)) {
-                partitionChannelMap.remove(partition).close();
-            }
-
-            partitionNodeMap.forcePut(partition, clusterNode);
-        }
-    }
-
-    @Override
-    public int getPartitionCount() {
-        // Number of nodes is not same as number of partitions
-        return topology.getTopology().getPartitionCount();
-    }
-
-    class TestHandler extends SimpleChannelHandler {
-        @Override
-        public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
-            // logger.info(String.format("%08x %08x %08x", e.getValue(),
-            // e.getChannel().getInterestOps(), Channel.OP_WRITE));
-            synchronized (sendLock) {
-                if (e.getChannel().isWritable()) {
-                    sendLock.notify();
-                }
-            }
-            ctx.sendUpstream(e);
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
-            Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
-            if (partitionId == null) {
-                logger.error("Error on mystery channel!!");
-            }
-            logger.error("Error on channel to partition " + partitionId);
-
-            try {
-                throw event.getCause();
-            } catch (ConnectException ce) {
-                logger.error(ce.getMessage(), ce);
-            } catch (Throwable err) {
-                logger.error("Error", err);
-                if (context.getChannel().isOpen()) {
-                    logger.error("Closing channel due to exception");
-                    context.getChannel().close();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java
deleted file mode 100644
index 3abeb3c..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/netty/NettyListener.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.s4.comm.netty;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.s4.base.Listener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-
-public class NettyListener implements Listener {
-    private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
-    private ClusterNode node;
-    private static final Logger logger = LoggerFactory.getLogger(NettyListener.class);
-    
-    @Inject
-    public NettyListener(Assignment assignment) {
-        // wait for an assignment
-        node = assignment.assignClusterNode();
-        
-        ChannelFactory factory =
-            new NioServerSocketChannelFactory(
-                    Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool());
-
-        ServerBootstrap bootstrap = new ServerBootstrap(factory);
-
-        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() {
-                ChannelPipeline p = Channels.pipeline();
-                p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
-                p.addLast("2", new ChannelHandler(handoffQueue));
-                
-                return p;
-            }
-        });
-
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.keepAlive", true);
-        
-        bootstrap.bind(new InetSocketAddress(node.getPort()));
-    }
-    
-    public byte[] recv() {
-        try {
-            return handoffQueue.take();
-        } catch (InterruptedException e) {
-        	return null;
-        }
-    }
-    
-    public int getPartitionId() {
-        return node.getPartition();
-    }
-    
-    public class ChannelHandler extends SimpleChannelHandler {
-        private BlockingQueue<byte[]> handoffQueue;
-        
-        public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
-            this.handoffQueue = handOffQueue;
-        }
-        
-        public void messageReceived(ChannelHandlerContext ctx,
-                MessageEvent e) {
-            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-            try {
-                handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
-                                                  // there's no receiver at the other end of the handoff queue
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
-        
-        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
-            logger.error("Error", event.getCause());
-            if (context.getChannel().isOpen()) {
-                logger.error("Closing channel due to exception");
-                context.getChannel().close();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
new file mode 100644
index 0000000..5960fc1
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -0,0 +1,286 @@
+package org.apache.s4.comm.tcp;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Hashtable;
+import java.util.Queue;
+import java.util.concurrent.Executors;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashBiMap;
+import com.google.inject.Inject;
+
+public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
+    private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
+    private static final int BUFFER_SIZE = 10;
+    private static final int NUM_RETRIES = 10;
+
+    private Topology topology;
+    private final ClientBootstrap bootstrap;
+
+    static class MessageQueuesPerPartition {
+        private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
+        private boolean bounded;
+
+        MessageQueuesPerPartition(boolean bounded) {
+            this.bounded = bounded;
+        }
+
+        private boolean add(int partitionId, byte[] message) {
+            Queue<byte[]> messages = queues.get(partitionId);
+
+            if (messages == null) {
+                messages = new ArrayDeque<byte[]>();
+                queues.put(partitionId, messages);
+            }
+
+            if (bounded && messages.size() >= BUFFER_SIZE) {
+                // Too many messages already queued
+                return false;
+            }
+
+            messages.offer(message);
+            return true;
+        }
+
+        private byte[] peek(int partitionId) {
+            Queue<byte[]> messages = queues.get(partitionId);
+
+            try {
+                return messages.peek();
+            } catch (NullPointerException npe) {
+                return null;
+            }
+        }
+
+        private void remove(int partitionId) {
+            Queue<byte[]> messages = queues.get(partitionId);
+
+            if (messages.isEmpty()) {
+                logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
+                return;
+            }
+
+            if (messages != null)
+                messages.remove();
+        }
+    }
+
+    private HashBiMap<Integer, Channel> partitionChannelMap;
+    private HashBiMap<Integer, ClusterNode> partitionNodeMap;
+    private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+
+    @Inject
+    public TCPEmitter(Topology topology) throws InterruptedException {
+        this.topology = topology;
+        topology.addListener(this);
+        int clusterSize = this.topology.getTopology().getNodes().size();
+
+        partitionChannelMap = HashBiMap.create(clusterSize);
+        partitionNodeMap = HashBiMap.create(clusterSize);
+
+        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
+
+        bootstrap = new ClientBootstrap(factory);
+
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
+            public ChannelPipeline getPipeline() {
+                ChannelPipeline p = Channels.pipeline();
+                p.addLast("1", new LengthFieldPrepender(4));
+                p.addLast("2", new TestHandler());
+                return p;
+            }
+        });
+
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+    }
+
+    private boolean connectTo(Integer partitionId) {
+        ClusterNode clusterNode = partitionNodeMap.get(partitionId);
+
+        if (clusterNode == null) {
+            clusterNode = topology.getTopology().getNodes().get(partitionId);
+            partitionNodeMap.forcePut(partitionId, clusterNode);
+        }
+
+        if (clusterNode == null) {
+            logger.error("No ClusterNode exists for partitionId " + partitionId);
+            return false;
+        }
+
+        for (int retries = 0; retries < NUM_RETRIES; retries++) {
+            ChannelFuture f = this.bootstrap.connect(new InetSocketAddress(clusterNode.getMachineName(), clusterNode
+                    .getPort()));
+            f.awaitUninterruptibly();
+            if (f.isSuccess()) {
+                partitionChannelMap.forcePut(partitionId, f.getChannel());
+                return true;
+            }
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException ie) {
+                logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
+                        clusterNode.getPort()));
+            }
+        }
+
+        return false;
+    }
+
+    private void writeMessageToChannel(Channel channel, int partitionId, byte[] message) {
+        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+        buffer.writeBytes(message);
+        ChannelFuture f = channel.write(buffer);
+        f.addListener(this);
+    }
+
+    private final Object sendLock = new Object();
+
+    @Override
+    public boolean send(int partitionId, byte[] message) {
+        Channel channel = partitionChannelMap.get(partitionId);
+        if (channel == null) {
+            if (connectTo(partitionId)) {
+                channel = partitionChannelMap.get(partitionId);
+            } else {
+                // could not connect, queue to the partitionBuffer
+                return queuedMessages.add(partitionId, message);
+            }
+        }
+
+        /*
+         * Try limiting the size of the send queue inside Netty
+         */
+        if (!channel.isWritable()) {
+            synchronized (sendLock) {
+                // check again now that we have the lock
+                while (!channel.isWritable()) {
+                    try {
+                        sendLock.wait();
+                    } catch (InterruptedException ie) {
+                        return false;
+                    }
+                }
+            }
+        }
+
+        /*
+         * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
+         * buffered messages, and (3) the Current Message
+         * 
+         * Once the channel returns success delete from the messagesOnTheWire
+         */
+        byte[] messageBeingSent = null;
+        // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
+        // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+        // }
+
+        while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
+            writeMessageToChannel(channel, partitionId, messageBeingSent);
+            queuedMessages.remove(partitionId);
+        }
+
+        writeMessageToChannel(channel, partitionId, message);
+        return true;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture f) {
+        int partitionId = partitionChannelMap.inverse().get(f.getChannel());
+        if (f.isSuccess()) {
+            // messagesOnTheWire.remove(partitionId);
+        }
+
+        if (f.isCancelled()) {
+            logger.error("Send I/O was cancelled!! " + f.getChannel().getRemoteAddress());
+        } else if (!f.isSuccess()) {
+            logger.error("Exception on I/O operation", f.getCause());
+            logger.error(String.format("I/O on partition %d failed!", partitionId));
+            partitionChannelMap.remove(partitionId);
+        }
+    }
+
+    @Override
+    public void onChange() {
+        /*
+         * Close the channels that correspond to changed partitions and update partitionNodeMap
+         */
+        for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+            Integer partition = clusterNode.getPartition();
+            ClusterNode oldNode = partitionNodeMap.get(partition);
+
+            if (oldNode != null && !oldNode.equals(clusterNode)) {
+                partitionChannelMap.remove(partition).close();
+            }
+
+            partitionNodeMap.forcePut(partition, clusterNode);
+        }
+    }
+
+    @Override
+    public int getPartitionCount() {
+        // Number of nodes is not same as number of partitions
+        return topology.getTopology().getPartitionCount();
+    }
+
+    class TestHandler extends SimpleChannelHandler {
+        @Override
+        public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
+            // logger.info(String.format("%08x %08x %08x", e.getValue(),
+            // e.getChannel().getInterestOps(), Channel.OP_WRITE));
+            synchronized (sendLock) {
+                if (e.getChannel().isWritable()) {
+                    sendLock.notify();
+                }
+            }
+            ctx.sendUpstream(e);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
+            Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
+            if (partitionId == null) {
+                logger.error("Error on mystery channel!!");
+            }
+            logger.error("Error on channel to partition " + partitionId);
+
+            try {
+                throw event.getCause();
+            } catch (ConnectException ce) {
+                logger.error(ce.getMessage(), ce);
+            } catch (Throwable err) {
+                logger.error("Error", err);
+                if (context.getChannel().isOpen()) {
+                    logger.error("Closing channel due to exception");
+                    context.getChannel().close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
new file mode 100644
index 0000000..b3776e1
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -0,0 +1,100 @@
+package org.apache.s4.comm.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.s4.base.Listener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+
+public class TCPListener implements Listener {
+    private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
+    private ClusterNode node;
+    private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
+    
+    @Inject
+    public TCPListener(Assignment assignment) {
+        // wait for an assignment
+        node = assignment.assignClusterNode();
+        
+        ChannelFactory factory =
+            new NioServerSocketChannelFactory(
+                    Executors.newCachedThreadPool(),
+                    Executors.newCachedThreadPool());
+
+        ServerBootstrap bootstrap = new ServerBootstrap(factory);
+
+        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            public ChannelPipeline getPipeline() {
+                ChannelPipeline p = Channels.pipeline();
+                p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
+                p.addLast("2", new ChannelHandler(handoffQueue));
+                
+                return p;
+            }
+        });
+
+        bootstrap.setOption("child.tcpNoDelay", true);
+        bootstrap.setOption("child.keepAlive", true);
+        
+        bootstrap.bind(new InetSocketAddress(node.getPort()));
+    }
+    
+    public byte[] recv() {
+        try {
+            return handoffQueue.take();
+        } catch (InterruptedException e) {
+        	return null;
+        }
+    }
+    
+    public int getPartitionId() {
+        return node.getPartition();
+    }
+    
+    public class ChannelHandler extends SimpleChannelHandler {
+        private BlockingQueue<byte[]> handoffQueue;
+        
+        public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
+            this.handoffQueue = handOffQueue;
+        }
+        
+        public void messageReceived(ChannelHandlerContext ctx,
+                MessageEvent e) {
+            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+            try {
+                handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
+                                                  // there's no receiver at the other end of the handoff queue
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        
+        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
+            logger.error("Error", event.getCause());
+            if (context.getChannel().isOpen()) {
+                logger.error("Closing channel due to exception");
+                context.getChannel().close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
index b539932..b2ee5e1 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
@@ -11,13 +11,13 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromFile;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
@@ -68,8 +68,8 @@ public class TCPCommTest extends SimpleDeliveryTest {
             bind(Topology.class).to(TopologyFromFile.class);
 
             /* Use a simple UDP comm layer implementation. */
-            bind(Listener.class).to(NettyListener.class);
-            bind(Emitter.class).to(NettyEmitter.class);
+            bind(Listener.class).to(TCPListener.class);
+            bind(Emitter.class).to(TCPEmitter.class);
 
             /* The hashing function to map keys top partitions. */
             bind(Hasher.class).to(DefaultHasher.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8b642d18/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index b3ccf10..f5959a2 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -13,15 +13,13 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
 import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
 import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
 import org.apache.s4.comm.topology.TopologyFromZK;
 import org.apache.s4.comm.udp.UDPEmitter;
 import org.apache.s4.comm.udp.UDPListener;
@@ -74,9 +72,8 @@ public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
         bind(Assignment.class).to(AssignmentFromZK.class);
         bind(Topology.class).to(TopologyFromZK.class);
-        bind(Emitter.class).to(NettyEmitter.class);
-        bind(Listener.class).to(NettyListener.class);
-
+        bind(Emitter.class).to(TCPEmitter.class);
+        bind(Listener.class).to(TCPListener.class);
     }
 
 }