You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/19 16:26:24 UTC
[8/32] git commit: Simplify TCPEmitter
Simplify TCPEmitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/694188f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/694188f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/694188f0
Branch: refs/heads/S4-57
Commit: 694188f0657ca610a2646c0c3e9e5a548fa5d5f7
Parents: 384e2c3
Author: Daniel Gómez Ferro <dg...@yahoo.es>
Authored: Fri Jul 13 17:43:06 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sat Jul 14 13:06:02 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 379 ++-------------
.../org/apache/s4/comm/tcp/TCPRemoteEmitter.java | 12 +-
2 files changed, 50 insertions(+), 341 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/694188f0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index f88d9b7..c60e483 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -1,13 +1,8 @@
package org.apache.s4.comm.tcp;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
-import java.util.Hashtable;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -28,10 +23,9 @@ import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -39,8 +33,9 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.name.Named;
@@ -49,7 +44,8 @@ import com.google.inject.name.Named;
* TCPEmitter - Uses TCP to send messages across partitions. It
* <ul>
* <li>guarantees message delivery</li>
- * <li>preserves pair-wise message ordering; might end up sending duplicates to ensure the order</li>
+ * <li>preserves pair-wise message ordering; might end up sending duplicates to
+ * ensure the order</li>
* <li>tolerates topology changes, partition re-mapping and network glitches</li>
* </ul>
* </p>
@@ -59,12 +55,15 @@ import com.google.inject.name.Named;
* <ul>
* <li>maintains per-partition queue of {@code Message}s</li>
* <li> <code>send(p, m)</code> queues the message 'm' to partition 'p'</li>
- * <li>a thread-pool is used to send the messages asynchronously to the appropriate partitions; send operations between
- * a pair of partitions are serialized</li>
- * <li>Each {@code Message} implements the {@link ChannelFutureListener} and listens on the {@link ChannelFuture}
- * corresponding to the send operation</li>
- * <li>On success, the message marks itself as sent; messages marked sent at the head of the queue are removed</li>
- * <li>On failure of a message m, 'm' and all the messages queued after 'm' are resent to preserve message ordering</li>
+ * <li>a thread-pool is used to send the messages asynchronously to the
+ * appropriate partitions; send operations between a pair of partitions are
+ * serialized</li>
+ * <li>Each {@code Message} implements the {@link ChannelFutureListener} and
+ * listens on the {@link ChannelFuture} corresponding to the send operation</li>
+ * <li>On success, the message marks itself as sent; messages marked sent at the
+ * head of the queue are removed</li>
+ * <li>On failure of a message m, 'm' and all the messages queued after 'm' are
+ * resent to preserve message ordering</li>
* </ul>
* </p>
*/
@@ -72,20 +71,12 @@ import com.google.inject.name.Named;
public class TCPEmitter implements Emitter, ClusterChangeListener {
private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
- private final int numRetries;
- private final int retryDelayMs;
private final int nettyTimeout;
- private final int bufferCapacity;
private Cluster topology;
private final ClientBootstrap bootstrap;
/*
- * debug information
- */
- private volatile int instanceId = 0;
-
- /*
* All channels
*/
private final ChannelGroup channels = new DefaultChannelGroup();
@@ -93,56 +84,29 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
/*
* Channel used to send messages to each partition
*/
- private final HashBiMap<Integer, Channel> partitionChannelMap;
+ private final BiMap<Integer, Channel> partitionChannelMap;
/*
* Node hosting each partition
*/
- private final HashBiMap<Integer, ClusterNode> partitionNodeMap;
+ private final BiMap<Integer, ClusterNode> partitionNodeMap;
- /*
- * Messages to be sent, stored per partition
- */
- private final Hashtable<Integer, SendQueue> sendQueues;
-
- /*
- * Thread pool to actually send messages
- */
- private final ExecutorService sendService;
+ // lock for synchronizing between cluster updates callbacks and other code
+ private final Lock lock;
@Inject
SerializerDeserializer serDeser;
- // lock for synchronizing between cluster updates callbacks and other code
- private final Lock lock;
-
@Inject
- public TCPEmitter(Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
- @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
- @Named("comm.timeout") int timeout) throws InterruptedException {
- this.numRetries = retries;
- this.retryDelayMs = retryDelay;
+ public TCPEmitter(Cluster topology, @Named("comm.timeout") int timeout) throws InterruptedException {
this.nettyTimeout = timeout;
- this.bufferCapacity = bufferSize;
this.topology = topology;
this.lock = new ReentrantLock();
// Initialize data structures
int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
- partitionChannelMap = HashBiMap.create(clusterSize);
+ partitionChannelMap = Maps.synchronizedBiMap(HashBiMap.<Integer, Channel> create(clusterSize));
partitionNodeMap = HashBiMap.create(clusterSize);
- sendQueues = new Hashtable<Integer, SendQueue>(clusterSize);
-
- // Initialize sendService
- int numCores = Runtime.getRuntime().availableProcessors();
- sendService = Executors.newFixedThreadPool(2 * numCores,
- new ThreadFactoryBuilder().setNameFormat("TCPEmitterSendServiceThread-#" + instanceId++ + "-%d")
- .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread paramThread, Throwable paramThrowable) {
- logger.error("Cannot send message", paramThrowable);
- }
- }).build());
// Initialize netty related structures
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -153,7 +117,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
public ChannelPipeline getPipeline() {
ChannelPipeline p = Channels.pipeline();
p.addLast("1", new LengthFieldPrepender(4));
- p.addLast("2", new NotifyChannelInterestChange());
+ p.addLast("2", new ExceptionHandler());
return p;
}
});
@@ -162,188 +126,12 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
-
}
@Inject
private void init() {
- refreshCluster();
this.topology.addListener(this);
- }
-
- private class Message implements ChannelFutureListener {
- private final SendQueue sendQ;
- private final byte[] message;
- private boolean sendSuccess = false;
- @Inject
- SerializerDeserializer serDeser;
-
- Message(SendQueue sendQ, byte[] message) {
- this.sendQ = sendQ;
- this.message = message;
- }
-
- private void sendMessage() {
- sendQ.emitter.sendMessage(sendQ.partitionId, this);
- }
-
- private void messageSendFailure() {
- logger.debug("Message send to partition {} has failed", sendQ.partitionId);
- synchronized (sendQ.failureFound) {
- sendQ.failureFound = true;
- }
- removeChannel(sendQ.partitionId);
- sendQ.spawnSendTask();
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- sendSuccess = true;
- sendQ.clearWire();
- return;
- }
-
- if (future.isCancelled()) {
- logger.error("Send I/O cancelled to " + future.getChannel().getRemoteAddress());
- }
-
- // failed operation
- messageSendFailure();
- }
- }
-
- private class SendQueue {
- private final TCPEmitter emitter;
- private final int partitionId;
- private final int bufferCapacity;
- private final Queue<Message> pending; // messages to be sent
- private final Queue<Message> wire; // messages in transit
-
- private Integer bufferSize = 0;
- private Boolean sending = false;
- private Boolean failureFound = false;
- private Boolean newMessages = false;
-
- SendQueue(TCPEmitter emitter, int partitionId, int bufferCapacity) {
- this.emitter = emitter;
- this.partitionId = partitionId;
- this.bufferCapacity = bufferCapacity;
- this.pending = new ConcurrentLinkedQueue<Message>();
- this.wire = new ConcurrentLinkedQueue<Message>();
- }
-
- private boolean lock() {
- if (sending)
- return false;
-
- sending = true;
- return true;
- }
-
- private void unlock() {
- sending = false;
- }
-
- private boolean offer(byte[] message) {
- Message m = new Message(this, message);
- synchronized (bufferSize) {
- if (bufferSize >= bufferCapacity) {
- return false;
- }
- bufferSize++;
- }
-
- pending.add(m);
- spawnSendTask();
- return true;
-
- }
-
- public void clearWire() {
- while (!wire.isEmpty()) {
- Message msg = wire.peek();
- if (!msg.sendSuccess)
- return;
- wire.remove();
- synchronized (bufferSize) {
- bufferSize--;
- }
- }
- }
-
- private void spawnSendTask() {
- // Lock before spawning a new SendTask
- boolean acquired = lock();
- if (acquired) {
- try {
- emitter.sendService.execute(new SendTask(this));
- } finally {
- unlock();
- }
- } else {
- synchronized (newMessages) {
- newMessages = true;
- }
- }
- }
-
- private void resendWiredMessages() {
- clearWire();
- for (Message msg : wire) {
- msg.sendMessage();
- }
- }
-
- private void sendPendingMessages() {
- Message msg = null;
- while ((msg = pending.poll()) != null) {
- msg.sendMessage();
- wire.add(msg);
- }
- }
-
- private void sendMessages() {
- while (true) {
- boolean resend = false;
- synchronized (failureFound) {
- if (failureFound) {
- resend = true;
- failureFound = false;
- } else
- break;
- }
-
- if (resend)
- resendWiredMessages();
- }
-
- while (true) {
- sendPendingMessages();
- synchronized (newMessages) {
- if (newMessages) {
- newMessages = false;
- continue;
- } else {
- unlock();
- break;
- }
- }
- }
- }
- }
-
- private class SendTask implements Runnable {
- private final SendQueue sendQ;
-
- SendTask(SendQueue sendQ) {
- this.sendQ = sendQ;
- }
-
- @Override
- public void run() {
- sendQ.sendMessages();
- }
+ refreshCluster();
}
private boolean connectTo(Integer partitionId) {
@@ -365,7 +153,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
partitionChannelMap.forcePut(partitionId, connectFuture.getChannel());
return true;
}
- Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
clusterNode.getPort()));
@@ -373,56 +160,28 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
return false;
}
- private void sendMessage(int partitionId, Message m) {
- boolean messageSent = false;
- ChannelBuffer buffer = ChannelBuffers.buffer(m.message.length);
- buffer.writeBytes(m.message);
+ private void sendMessage(int partitionId, byte[] message) {
+ ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
+ buffer.writeBytes(message);
- for (int retries = 0; retries < numRetries; retries++) {
- if (!partitionChannelMap.containsKey(partitionId)) {
- if (!connectTo(partitionId)) {
- continue;
- }
- }
-
- Channel c = partitionChannelMap.get(partitionId);
- if (c == null)
- continue;
- if (!c.isWritable()) {
- try {
- logger.debug("Waiting for channel to partition {} to become writable", partitionId);
- // Though we wait for the channel to be writable, it could immediately become non-writable. Hence,
- // the wait is just a precaution to minimize failed writes.
- SendQueue sendQ = sendQueues.get(partitionId);
- synchronized (sendQ) {
- sendQ.wait();
- }
- } catch (InterruptedException e) {
- continue;
- }
- }
-
- if (c != null && c.isWritable()) {
- c.write(buffer).addListener(m);
- messageSent = true;
- break;
+ if (!partitionChannelMap.containsKey(partitionId)) {
+ if (!connectTo(partitionId)) {
+ // Couldn't connect, discard message
+ return;
}
}
- if (!messageSent) {
- m.messageSendFailure();
- }
+ Channel c = partitionChannelMap.get(partitionId);
+ if (c == null)
+ return;
+ c.write(buffer);
}
@Override
public boolean send(int partitionId, EventMessage message) {
- if (!sendQueues.containsKey(partitionId)) {
- SendQueue sendQueue = new SendQueue(this, partitionId, this.bufferCapacity);
- sendQueues.put(partitionId, sendQueue);
- }
-
- return sendQueues.get(partitionId).offer(serDeser.serialize(message));
+ sendMessage(partitionId, serDeser.serialize(message));
+ return true;
}
protected void removeChannel(int partition) {
@@ -442,20 +201,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
}
public void close() {
- for (SendQueue sendQ : sendQueues.values()) {
- if (!sendQ.wire.isEmpty()) {
- logger.error("TCPEmitter could not deliver {} messages to partition {}", sendQ.wire.size(),
- sendQ.partitionId);
- sendQ.wire.clear();
- }
-
- if (!sendQ.pending.isEmpty()) {
- logger.error("TCPEmitter could not send {} messages to partition {}", sendQ.pending.size(),
- sendQ.partitionId);
- sendQ.pending.clear();
- }
- }
-
try {
channels.close().await();
bootstrap.releaseExternalResources();
@@ -496,54 +241,18 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
return topology.getPhysicalCluster().getPartitionCount();
}
- class NotifyChannelInterestChange extends SimpleChannelHandler {
+ class ExceptionHandler extends SimpleChannelUpstreamHandler {
@Override
- public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
- Channel c = e.getChannel();
- Integer partitionId = partitionChannelMap.inverse().get(c);
- if (partitionId == null) {
- logger.debug("channelInterestChanged for an unknown/deleted channel");
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ Throwable t = e.getCause();
+ if (t instanceof ClosedChannelException) {
+ partitionChannelMap.inverse().remove(e.getChannel());
return;
- }
-
- SendQueue sendQ = sendQueues.get(partitionId);
- synchronized (sendQ) {
- if (c.isWritable()) {
- sendQ.notify();
- }
- }
-
- ctx.sendUpstream(e);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
- try {
- throw event.getCause();
- } catch (ClosedChannelException cce) {
+ } else if (t instanceof ConnectException) {
+ partitionChannelMap.inverse().remove(e.getChannel());
return;
- } catch (ConnectException ce) {
- return;
- } catch (Throwable e) {
- e.printStackTrace();
- // Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
- // String target;
- // if (partitionId == null) {
- // target = "unknown channel";
- // } else {
- // target = "channel for partition [" + partitionId + "], target node host ["
- // + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
- // + partitionNodeMap.get(partitionId).getPort() + "]";
- // }
- // logger.error(
- // "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
- // target);
- //
- // if (context.getChannel().isOpen()) {
- // logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
- // context.getChannel().close();
- // }
-
+ } else {
+ logger.error("Unexpected exception", t);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/694188f0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index 3fd5bcf..b19d5be 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -14,14 +14,14 @@ import com.google.inject.name.Named;
public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
/**
- * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
- * discovered (as remote streams outputs)
+ * Sends to remote subclusters. This is dynamically created, through an
+ * injected factory, when new subclusters are discovered (as remote streams
+ * outputs)
*/
@Inject
- public TCPRemoteEmitter(@Assisted Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
- @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
- @Named("comm.timeout") int timeout) throws InterruptedException {
- super(topology, bufferSize, retries, retryDelay, timeout);
+ public TCPRemoteEmitter(@Assisted Cluster topology, @Named("comm.timeout") int timeout)
+ throws InterruptedException {
+ super(topology, timeout);
}
}