You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/19 16:26:24 UTC

[8/32] git commit: Simplify TCPEmitter

Simplify TCPEmitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/694188f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/694188f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/694188f0

Branch: refs/heads/S4-57
Commit: 694188f0657ca610a2646c0c3e9e5a548fa5d5f7
Parents: 384e2c3
Author: Daniel Gómez Ferro <dg...@yahoo.es>
Authored: Fri Jul 13 17:43:06 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sat Jul 14 13:06:02 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |  379 ++-------------
 .../org/apache/s4/comm/tcp/TCPRemoteEmitter.java   |   12 +-
 2 files changed, 50 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/694188f0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index f88d9b7..c60e483 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -1,13 +1,8 @@
 package org.apache.s4.comm.tcp;
 
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
-import java.util.Hashtable;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -28,10 +23,9 @@ import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -39,8 +33,9 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
@@ -49,7 +44,8 @@ import com.google.inject.name.Named;
  * TCPEmitter - Uses TCP to send messages across partitions. It
  * <ul>
  * <li>guarantees message delivery</li>
- * <li>preserves pair-wise message ordering; might end up sending duplicates to ensure the order</li>
+ * <li>preserves pair-wise message ordering; might end up sending duplicates to
+ * ensure the order</li>
  * <li>tolerates topology changes, partition re-mapping and network glitches</li>
  * </ul>
  * </p>
@@ -59,12 +55,15 @@ import com.google.inject.name.Named;
  * <ul>
  * <li>maintains per-partition queue of {@code Message}s</li>
  * <li> <code>send(p, m)</code> queues the message 'm' to partition 'p'</li>
- * <li>a thread-pool is used to send the messages asynchronously to the appropriate partitions; send operations between
- * a pair of partitions are serialized</li>
- * <li>Each {@code Message} implements the {@link ChannelFutureListener} and listens on the {@link ChannelFuture}
- * corresponding to the send operation</li>
- * <li>On success, the message marks itself as sent; messages marked sent at the head of the queue are removed</li>
- * <li>On failure of a message m, 'm' and all the messages queued after 'm' are resent to preserve message ordering</li>
+ * <li>a thread-pool is used to send the messages asynchronously to the
+ * appropriate partitions; send operations between a pair of partitions are
+ * serialized</li>
+ * <li>Each {@code Message} implements the {@link ChannelFutureListener} and
+ * listens on the {@link ChannelFuture} corresponding to the send operation</li>
+ * <li>On success, the message marks itself as sent; messages marked sent at the
+ * head of the queue are removed</li>
+ * <li>On failure of a message m, 'm' and all the messages queued after 'm' are
+ * resent to preserve message ordering</li>
  * </ul>
  * </p>
  */
@@ -72,20 +71,12 @@ import com.google.inject.name.Named;
 public class TCPEmitter implements Emitter, ClusterChangeListener {
     private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
 
-    private final int numRetries;
-    private final int retryDelayMs;
     private final int nettyTimeout;
-    private final int bufferCapacity;
 
     private Cluster topology;
     private final ClientBootstrap bootstrap;
 
     /*
-     * debug information
-     */
-    private volatile int instanceId = 0;
-
-    /*
      * All channels
      */
     private final ChannelGroup channels = new DefaultChannelGroup();
@@ -93,56 +84,29 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     /*
      * Channel used to send messages to each partition
      */
-    private final HashBiMap<Integer, Channel> partitionChannelMap;
+    private final BiMap<Integer, Channel> partitionChannelMap;
 
     /*
      * Node hosting each partition
      */
-    private final HashBiMap<Integer, ClusterNode> partitionNodeMap;
+    private final BiMap<Integer, ClusterNode> partitionNodeMap;
 
-    /*
-     * Messages to be sent, stored per partition
-     */
-    private final Hashtable<Integer, SendQueue> sendQueues;
-
-    /*
-     * Thread pool to actually send messages
-     */
-    private final ExecutorService sendService;
+    // lock for synchronizing between cluster updates callbacks and other code
+    private final Lock lock;
 
     @Inject
     SerializerDeserializer serDeser;
 
-    // lock for synchronizing between cluster updates callbacks and other code
-    private final Lock lock;
-
     @Inject
-    public TCPEmitter(Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
-            @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
-            @Named("comm.timeout") int timeout) throws InterruptedException {
-        this.numRetries = retries;
-        this.retryDelayMs = retryDelay;
+    public TCPEmitter(Cluster topology, @Named("comm.timeout") int timeout) throws InterruptedException {
         this.nettyTimeout = timeout;
-        this.bufferCapacity = bufferSize;
         this.topology = topology;
         this.lock = new ReentrantLock();
 
         // Initialize data structures
         int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
-        partitionChannelMap = HashBiMap.create(clusterSize);
+        partitionChannelMap = Maps.synchronizedBiMap(HashBiMap.<Integer, Channel> create(clusterSize));
         partitionNodeMap = HashBiMap.create(clusterSize);
-        sendQueues = new Hashtable<Integer, SendQueue>(clusterSize);
-
-        // Initialize sendService
-        int numCores = Runtime.getRuntime().availableProcessors();
-        sendService = Executors.newFixedThreadPool(2 * numCores,
-                new ThreadFactoryBuilder().setNameFormat("TCPEmitterSendServiceThread-#" + instanceId++ + "-%d")
-                        .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-                            @Override
-                            public void uncaughtException(Thread paramThread, Throwable paramThrowable) {
-                                logger.error("Cannot send message", paramThrowable);
-                            }
-                        }).build());
 
         // Initialize netty related structures
         ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -153,7 +117,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
                 p.addLast("1", new LengthFieldPrepender(4));
-                p.addLast("2", new NotifyChannelInterestChange());
+                p.addLast("2", new ExceptionHandler());
                 return p;
             }
         });
@@ -162,188 +126,12 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         bootstrap.setOption("keepAlive", true);
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
-
     }
 
     @Inject
     private void init() {
-        refreshCluster();
         this.topology.addListener(this);
-    }
-
-    private class Message implements ChannelFutureListener {
-        private final SendQueue sendQ;
-        private final byte[] message;
-        private boolean sendSuccess = false;
-        @Inject
-        SerializerDeserializer serDeser;
-
-        Message(SendQueue sendQ, byte[] message) {
-            this.sendQ = sendQ;
-            this.message = message;
-        }
-
-        private void sendMessage() {
-            sendQ.emitter.sendMessage(sendQ.partitionId, this);
-        }
-
-        private void messageSendFailure() {
-            logger.debug("Message send to partition {} has failed", sendQ.partitionId);
-            synchronized (sendQ.failureFound) {
-                sendQ.failureFound = true;
-            }
-            removeChannel(sendQ.partitionId);
-            sendQ.spawnSendTask();
-        }
-
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-            if (future.isSuccess()) {
-                sendSuccess = true;
-                sendQ.clearWire();
-                return;
-            }
-
-            if (future.isCancelled()) {
-                logger.error("Send I/O cancelled to " + future.getChannel().getRemoteAddress());
-            }
-
-            // failed operation
-            messageSendFailure();
-        }
-    }
-
-    private class SendQueue {
-        private final TCPEmitter emitter;
-        private final int partitionId;
-        private final int bufferCapacity;
-        private final Queue<Message> pending; // messages to be sent
-        private final Queue<Message> wire; // messages in transit
-
-        private Integer bufferSize = 0;
-        private Boolean sending = false;
-        private Boolean failureFound = false;
-        private Boolean newMessages = false;
-
-        SendQueue(TCPEmitter emitter, int partitionId, int bufferCapacity) {
-            this.emitter = emitter;
-            this.partitionId = partitionId;
-            this.bufferCapacity = bufferCapacity;
-            this.pending = new ConcurrentLinkedQueue<Message>();
-            this.wire = new ConcurrentLinkedQueue<Message>();
-        }
-
-        private boolean lock() {
-            if (sending)
-                return false;
-
-            sending = true;
-            return true;
-        }
-
-        private void unlock() {
-            sending = false;
-        }
-
-        private boolean offer(byte[] message) {
-            Message m = new Message(this, message);
-            synchronized (bufferSize) {
-                if (bufferSize >= bufferCapacity) {
-                    return false;
-                }
-                bufferSize++;
-            }
-
-            pending.add(m);
-            spawnSendTask();
-            return true;
-
-        }
-
-        public void clearWire() {
-            while (!wire.isEmpty()) {
-                Message msg = wire.peek();
-                if (!msg.sendSuccess)
-                    return;
-                wire.remove();
-                synchronized (bufferSize) {
-                    bufferSize--;
-                }
-            }
-        }
-
-        private void spawnSendTask() {
-            // Lock before spawning a new SendTask
-            boolean acquired = lock();
-            if (acquired) {
-                try {
-                    emitter.sendService.execute(new SendTask(this));
-                } finally {
-                    unlock();
-                }
-            } else {
-                synchronized (newMessages) {
-                    newMessages = true;
-                }
-            }
-        }
-
-        private void resendWiredMessages() {
-            clearWire();
-            for (Message msg : wire) {
-                msg.sendMessage();
-            }
-        }
-
-        private void sendPendingMessages() {
-            Message msg = null;
-            while ((msg = pending.poll()) != null) {
-                msg.sendMessage();
-                wire.add(msg);
-            }
-        }
-
-        private void sendMessages() {
-            while (true) {
-                boolean resend = false;
-                synchronized (failureFound) {
-                    if (failureFound) {
-                        resend = true;
-                        failureFound = false;
-                    } else
-                        break;
-                }
-
-                if (resend)
-                    resendWiredMessages();
-            }
-
-            while (true) {
-                sendPendingMessages();
-                synchronized (newMessages) {
-                    if (newMessages) {
-                        newMessages = false;
-                        continue;
-                    } else {
-                        unlock();
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    private class SendTask implements Runnable {
-        private final SendQueue sendQ;
-
-        SendTask(SendQueue sendQ) {
-            this.sendQ = sendQ;
-        }
-
-        @Override
-        public void run() {
-            sendQ.sendMessages();
-        }
+        refreshCluster();
     }
 
     private boolean connectTo(Integer partitionId) {
@@ -365,7 +153,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                 partitionChannelMap.forcePut(partitionId, connectFuture.getChannel());
                 return true;
             }
-            Thread.sleep(retryDelayMs);
         } catch (InterruptedException ie) {
             logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
                     clusterNode.getPort()));
@@ -373,56 +160,28 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         return false;
     }
 
-    private void sendMessage(int partitionId, Message m) {
-        boolean messageSent = false;
-        ChannelBuffer buffer = ChannelBuffers.buffer(m.message.length);
-        buffer.writeBytes(m.message);
+    private void sendMessage(int partitionId, byte[] message) {
+        ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+        buffer.writeBytes(message);
 
-        for (int retries = 0; retries < numRetries; retries++) {
-            if (!partitionChannelMap.containsKey(partitionId)) {
-                if (!connectTo(partitionId)) {
-                    continue;
-                }
-            }
-
-            Channel c = partitionChannelMap.get(partitionId);
-            if (c == null)
-                continue;
-            if (!c.isWritable()) {
-                try {
-                    logger.debug("Waiting for channel to partition {} to become writable", partitionId);
-                    // Though we wait for the channel to be writable, it could immediately become non-writable. Hence,
-                    // the wait is just a precaution to minimize failed writes.
-                    SendQueue sendQ = sendQueues.get(partitionId);
-                    synchronized (sendQ) {
-                        sendQ.wait();
-                    }
-                } catch (InterruptedException e) {
-                    continue;
-                }
-            }
-
-            if (c != null && c.isWritable()) {
-                c.write(buffer).addListener(m);
-                messageSent = true;
-                break;
+        if (!partitionChannelMap.containsKey(partitionId)) {
+            if (!connectTo(partitionId)) {
+                // Couldn't connect, discard message
+                return;
             }
         }
 
-        if (!messageSent) {
-            m.messageSendFailure();
-        }
+        Channel c = partitionChannelMap.get(partitionId);
+        if (c == null)
+            return;
 
+        c.write(buffer);
     }
 
     @Override
     public boolean send(int partitionId, EventMessage message) {
-        if (!sendQueues.containsKey(partitionId)) {
-            SendQueue sendQueue = new SendQueue(this, partitionId, this.bufferCapacity);
-            sendQueues.put(partitionId, sendQueue);
-        }
-
-        return sendQueues.get(partitionId).offer(serDeser.serialize(message));
+        sendMessage(partitionId, serDeser.serialize(message));
+        return true;
     }
 
     protected void removeChannel(int partition) {
@@ -442,20 +201,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     }
 
     public void close() {
-        for (SendQueue sendQ : sendQueues.values()) {
-            if (!sendQ.wire.isEmpty()) {
-                logger.error("TCPEmitter could not deliver {} messages to partition {}", sendQ.wire.size(),
-                        sendQ.partitionId);
-                sendQ.wire.clear();
-            }
-
-            if (!sendQ.pending.isEmpty()) {
-                logger.error("TCPEmitter could not send {} messages to partition {}", sendQ.pending.size(),
-                        sendQ.partitionId);
-                sendQ.pending.clear();
-            }
-        }
-
         try {
             channels.close().await();
             bootstrap.releaseExternalResources();
@@ -496,54 +241,18 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         return topology.getPhysicalCluster().getPartitionCount();
     }
 
-    class NotifyChannelInterestChange extends SimpleChannelHandler {
+    class ExceptionHandler extends SimpleChannelUpstreamHandler {
         @Override
-        public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
-            Channel c = e.getChannel();
-            Integer partitionId = partitionChannelMap.inverse().get(c);
-            if (partitionId == null) {
-                logger.debug("channelInterestChanged for an unknown/deleted channel");
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+            Throwable t = e.getCause();
+            if (t instanceof ClosedChannelException) {
+                partitionChannelMap.inverse().remove(e.getChannel());
                 return;
-            }
-
-            SendQueue sendQ = sendQueues.get(partitionId);
-            synchronized (sendQ) {
-                if (c.isWritable()) {
-                    sendQ.notify();
-                }
-            }
-
-            ctx.sendUpstream(e);
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
-            try {
-                throw event.getCause();
-            } catch (ClosedChannelException cce) {
+            } else if (t instanceof ConnectException) {
+                partitionChannelMap.inverse().remove(e.getChannel());
                 return;
-            } catch (ConnectException ce) {
-                return;
-            } catch (Throwable e) {
-                e.printStackTrace();
-                // Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
-                // String target;
-                // if (partitionId == null) {
-                // target = "unknown channel";
-                // } else {
-                // target = "channel for partition [" + partitionId + "], target node host ["
-                // + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
-                // + partitionNodeMap.get(partitionId).getPort() + "]";
-                // }
-                // logger.error(
-                // "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
-                // target);
-                //
-                // if (context.getChannel().isOpen()) {
-                // logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
-                // context.getChannel().close();
-                // }
-
+            } else {
+                logger.error("Unexpected exception", t);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/694188f0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index 3fd5bcf..b19d5be 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -14,14 +14,14 @@ import com.google.inject.name.Named;
 public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
 
     /**
-     * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
-     * discovered (as remote streams outputs)
+     * Sends to remote subclusters. This is dynamically created, through an
+     * injected factory, when new subclusters are discovered (as remote streams
+     * outputs)
      */
     @Inject
-    public TCPRemoteEmitter(@Assisted Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
-            @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
-            @Named("comm.timeout") int timeout) throws InterruptedException {
-        super(topology, bufferSize, retries, retryDelay, timeout);
+    public TCPRemoteEmitter(@Assisted Cluster topology, @Named("comm.timeout") int timeout)
+            throws InterruptedException {
+        super(topology, timeout);
     }
 
 }