You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/07 12:21:06 UTC
git commit: [FLINK-1063] Revert race introducing commit
Repository: incubator-flink
Updated Branches:
refs/heads/FLINK-1086-revert_close [created] c0c2abda5
[FLINK-1063] Revert race introducing commit
This reverts "[FLINK-998] Close network connections when idle" [1] as
it introduced a race condition into the network stack, which might
result in a re-ordering of network envelopes.
[1] 52512636444902497e47ccbfb1cabaffb3e23343
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c0c2abda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c0c2abda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c0c2abda
Branch: refs/heads/FLINK-1086-revert_close
Commit: c0c2abda5eaaabc6291f765718d88dcbdc12d2a4
Parents: 66c1263
Author: uce <u....@fu-berlin.de>
Authored: Sat Sep 6 12:52:12 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Sun Sep 7 12:07:06 2014 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 20 +-
.../io/network/LocalConnectionManager.java | 9 +-
.../io/network/NetworkConnectionManager.java | 7 +-
.../runtime/io/network/RemoteReceiver.java | 2 +-
.../network/netty/NettyConnectionManager.java | 106 ++--
.../network/netty/OutboundConnectionQueue.java | 164 ++---
.../flink/runtime/taskmanager/TaskManager.java | 13 +-
.../netty/NettyConnectionManagerTest.java | 272 ++++-----
.../netty/OutboundConnectionQueueTest.java | 606 -------------------
.../test/runtime/NetworkStackThroughput.java | 1 -
10 files changed, 210 insertions(+), 990 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2983c63..ef5ee6e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -117,9 +117,14 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = "taskmanager.net.numOutThreads";
/**
- * The minimum time in ms a channel must be idle, before it will be closed.
+ * The low water mark used in NettyConnectionManager for the Bootstrap.
*/
- public static final String TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY = "taskmanager.net.closeAfterIdleForMs";
+ public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = "taskmanager.net.nettyLowWaterMark";
+
+ /**
+ * The high water mark used in NettyConnectionManager for the Bootstrap.
+ */
+ public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark";
/**
* Parameter for the interval in which the TaskManager sends the periodic heart beat messages
@@ -353,9 +358,16 @@ public final class ConfigConstants {
public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1;
/**
- * The minimum time in ms a channel must be idle, before it will be closed.
+ * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+ * will use half of the network buffer size as the low water mark.
+ */
+ public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = -1;
+
+ /**
+ * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+ * will use the network buffer size as the high water mark.
*/
- public static final int DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS = 10000;
+ public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
/**
* The default interval for TaskManager heart beats (2000 msecs).
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 612d6d5..645f71e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -16,14 +16,11 @@
* limitations under the License.
*/
+
package org.apache.flink.runtime.io.network;
import java.io.IOException;
-/**
- * Local dummy network connection manager used, when the task manager
- * runs in local mode without starting up the remote network stack.
- */
public class LocalConnectionManager implements NetworkConnectionManager {
@Override
@@ -35,10 +32,6 @@ public class LocalConnectionManager implements NetworkConnectionManager {
}
@Override
- public void close(RemoteReceiver receiver) {
- }
-
- @Override
public void shutdown() throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
index ceef2c7..7e3e0b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
@@ -16,21 +16,16 @@
* limitations under the License.
*/
+
package org.apache.flink.runtime.io.network;
import java.io.IOException;
-/**
- * The network connection manager is responsible to dispatch envelopes
- * to remote receivers.
- */
public interface NetworkConnectionManager {
public void start(ChannelManager channelManager) throws IOException;
public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException;
- public void close(RemoteReceiver receiver);
-
public void shutdown() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 8815ea5..23d1205 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
* Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
*
*/
-public class RemoteReceiver implements IOReadableWritable {
+public final class RemoteReceiver implements IOReadableWritable {
/**
* The address of the connection to the remote {@link TaskManager}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 6d7e15c..b526ab3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -68,19 +68,12 @@ public class NettyConnectionManager implements NetworkConnectionManager {
private final int highWaterMark;
- private final int closeAfterIdleForMs;
-
private ServerBootstrap in;
private Bootstrap out;
- public NettyConnectionManager(
- InetAddress bindAddress,
- int bindPort,
- int bufferSize,
- int numInThreads,
- int numOutThreads,
- int closeAfterIdleForMs) {
+ public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads,
+ int numOutThreads, int lowWaterMark, int highWaterMark) {
this.bindAddress = bindAddress;
this.bindPort = bindPort;
@@ -92,19 +85,14 @@ public class NettyConnectionManager implements NetworkConnectionManager {
this.numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
this.numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
- this.lowWaterMark = bufferSize / 2;
- this.highWaterMark = bufferSize;
-
- this.closeAfterIdleForMs = closeAfterIdleForMs;
+ this.lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
+ this.highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
}
@Override
public void start(ChannelManager channelManager) throws IOException {
- LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.",
- numInThreads, numOutThreads));
- LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.",
- lowWaterMark, highWaterMark));
- LOG.info(String.format("Close channels after idle for %d ms.", closeAfterIdleForMs));
+ LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
+ LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
final BufferProviderBroker bufferProviderBroker = channelManager;
final EnvelopeDispatcher envelopeDispatcher = channelManager;
@@ -112,16 +100,16 @@ public class NettyConnectionManager implements NetworkConnectionManager {
int numHeapArenas = 0;
int numDirectArenas = numInThreads + numOutThreads;
int pageSize = bufferSize << 1;
- int chunkSize = 16 << 20; // 16 MB
+ int chunkSize = 16 * 1 << 20; // 16 MB
// shift pageSize maxOrder times to get to chunkSize
- int maxOrder = (int) (Math.log(chunkSize / pageSize) / Math.log(2));
+ int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2));
PooledByteBufAllocator pooledByteBufAllocator =
new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " +
- "page size (bytes): %d, chunk size (bytes): %d.",
+ "page size (bytes): %d, chunk size (bytes): %d.",
numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder));
LOG.info(msg);
@@ -198,7 +186,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
// 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
// 3) not yet existing -> establish the channel
- final Object entry = outConnections.get(receiver);
+ final Object entry = this.outConnections.get(receiver);
final OutboundConnectionQueue channel;
if (entry != null) {
@@ -216,14 +204,14 @@ public class NettyConnectionManager implements NetworkConnectionManager {
// We create a "buildup future" and atomically add it to the map.
// Only the thread that really added it establishes the channel.
// The others need to wait on that original establisher's future.
- ChannelInBuildup inBuildup = new ChannelInBuildup(out, receiver, this, closeAfterIdleForMs);
- Object old = outConnections.putIfAbsent(receiver, inBuildup);
+ ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
+ Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
if (old == null) {
- out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
+ this.out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
channel = inBuildup.waitForChannel();
- Object previous = outConnections.put(receiver, channel);
+ Object previous = this.outConnections.put(receiver, channel);
if (inBuildup != previous) {
throw new IOException("Race condition during channel build up.");
@@ -237,19 +225,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
}
}
- if (!channel.enqueue(envelope)) {
- // The channel has been closed, try again.
- LOG.debug("Retry enqueue on channel: " + channel + ".");
-
- // This will either establish a new connection or use the
- // one, which has been established in the mean time.
- enqueue(envelope, receiver);
- }
- }
-
- @Override
- public void close(RemoteReceiver receiver) {
- outConnections.remove(receiver);
+ channel.enqueue(envelope);
}
@Override
@@ -278,9 +254,9 @@ public class NettyConnectionManager implements NetworkConnectionManager {
private String getNonZeroNumQueuedEnvelopes() {
StringBuilder str = new StringBuilder();
- str.append(String.format("==== %d outgoing connections ===\n", outConnections.size()));
+ str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
- for (Map.Entry<RemoteReceiver, Object> entry : outConnections.entrySet()) {
+ for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
RemoteReceiver receiver = entry.getKey();
Object value = entry.getValue();
@@ -316,52 +292,41 @@ public class NettyConnectionManager implements NetworkConnectionManager {
private final RemoteReceiver receiver;
- private final NetworkConnectionManager connectionManager;
-
- private final int closeAfterIdleMs;
-
- private ChannelInBuildup(
- Bootstrap out,
- RemoteReceiver receiver,
- NetworkConnectionManager connectionManager,
- int closeAfterIdleMs) {
-
+ private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
this.out = out;
this.receiver = receiver;
- this.connectionManager = connectionManager;
- this.closeAfterIdleMs = closeAfterIdleMs;
}
private void handInChannel(OutboundConnectionQueue c) {
- synchronized (lock) {
- channel = c;
- lock.notifyAll();
+ synchronized (this.lock) {
+ this.channel = c;
+ this.lock.notifyAll();
}
}
- private void notifyOfError(Throwable t) {
- synchronized (lock) {
- error = t;
- lock.notifyAll();
+ private void notifyOfError(Throwable error) {
+ synchronized (this.lock) {
+ this.error = error;
+ this.lock.notifyAll();
}
}
private OutboundConnectionQueue waitForChannel() throws IOException {
- synchronized (lock) {
- while (error == null && channel == null) {
+ synchronized (this.lock) {
+ while (this.error == null && this.channel == null) {
try {
- lock.wait(2000);
+ this.lock.wait(2000);
} catch (InterruptedException e) {
throw new RuntimeException("Channel buildup interrupted.");
}
}
}
- if (error != null) {
+ if (this.error != null) {
throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
}
- return channel;
+ return this.channel;
}
@Override
@@ -371,14 +336,13 @@ public class NettyConnectionManager implements NetworkConnectionManager {
LOG.debug(String.format("Channel %s connected", future.channel()));
}
- handInChannel(new OutboundConnectionQueue(
- future.channel(), receiver, connectionManager, closeAfterIdleMs));
+ handInChannel(new OutboundConnectionQueue(future.channel()));
}
- else if (numRetries > 0) {
- LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", numRetries));
+ else if (this.numRetries > 0) {
+ LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", this.numRetries));
- out.connect(receiver.getConnectionAddress()).addListener(this);
- numRetries--;
+ this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
+ this.numRetries--;
}
else {
if (future.getClass() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
index 4008e97..5a9f789 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -23,137 +23,73 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
+import org.apache.flink.runtime.io.network.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
-
- private static enum QueueEvent {
- TRIGGER_WRITE
- }
+public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
private static final Logger LOG = LoggerFactory.getLogger(OutboundConnectionQueue.class);
- private final ChannelWriteListener writeListener = new ChannelWriteListener();
-
private final Channel channel;
- private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+ private final ArrayDeque<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
- private final RemoteReceiver receiver;
-
- private final NetworkConnectionManager connectionManager;
-
- // Flag to indicate whether a channel close was requested. This flag is true,
- // iff there are no queued envelopes, when the channel is idling. After a
- // successful close request, enqueue should return false.
- private boolean hasRequestedClose = false;
-
- public OutboundConnectionQueue(
- Channel channel,
- RemoteReceiver receiver,
- NetworkConnectionManager connectionManager,
- int closeAfterIdleForMs) {
+ private final AtomicInteger numQueuedEnvelopes = new AtomicInteger(0);
+ public OutboundConnectionQueue(Channel channel) {
this.channel = channel;
- this.receiver = receiver;
- this.connectionManager = connectionManager;
- channel.pipeline().addFirst("Outbound Connection Queue", this);
- channel.pipeline().addFirst("Idle State Handler",
- new IdleStateHandler(0, 0, closeAfterIdleForMs, TimeUnit.MILLISECONDS));
+ channel.pipeline().addFirst(this);
}
/**
- * Enqueues an envelope to be sent.
+ * Enqueues an envelope to be sent later.
* <p/>
* This method is always invoked by the task thread that wants the envelope sent.
- * <p/>
- * If this method returns <code>false</code>, the channel cannot be used to enqueue
- * envelopes any more and the caller needs to establish a new connection to the target.
- * The current envelope needs to be enqueued at the new channel.
*
- * @param env the envelope to be sent
- * @return true, if successfully enqueued or false, if the channel was requested to be closed
+ * @param env The envelope to be sent.
*/
- public boolean enqueue(Envelope env) {
- boolean triggerWrite;
-
- synchronized (channel) {
- if (hasRequestedClose) {
- // The caller has to ensure that the envelope gets queued to
- // a new channel.
- return false;
- }
+ public void enqueue(Envelope env) {
+ // the user event trigger ensure thread-safe hand-over of the envelope
+ this.channel.pipeline().fireUserEventTriggered(env);
+ }
- // Initiate envelope processing, after the queue state has
- // changed from empty to non-empty.
- triggerWrite = queuedEnvelopes.isEmpty();
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnqueue) throws Exception {
+ boolean triggerWrite = this.queuedEnvelopes.isEmpty();
- queuedEnvelopes.add(env);
- }
+ this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
+ this.numQueuedEnvelopes.incrementAndGet();
if (triggerWrite) {
- channel.pipeline().fireUserEventTriggered(QueueEvent.TRIGGER_WRITE);
+ writeAndFlushNextEnvelopeIfPossible();
}
+ }
- return true;
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ writeAndFlushNextEnvelopeIfPossible();
}
@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
- if (event.getClass() == QueueEvent.class) {
- // Initiate envelope processing, after the queue state has
- // changed from empty to non-empty.
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
writeAndFlushNextEnvelopeIfPossible();
}
- else if (event.getClass() == IdleStateEvent.class) {
- // Channel idle => try to close
- boolean closeConnection = false;
-
- // Only close the connection, if there are no queued envelopes. We have
- // to ensure that there is no race between closing the channel and
- // enqueuing a new envelope.
- synchronized (channel) {
- if (queuedEnvelopes.isEmpty() && !hasRequestedClose) {
-
- hasRequestedClose = true;
-
- closeConnection = true;
-
- // Notify the connection manager that this channel has been
- // closed.
- connectionManager.close(receiver);
- }
- }
-
- if (closeConnection) {
- ctx.close().addListener(new ChannelCloseListener());
- }
+ else if (future.cause() != null) {
+ exceptionOccurred(future.cause());
}
else {
- throw new IllegalStateException("Triggered unknown event.");
+ exceptionOccurred(new Exception("Envelope send aborted."));
}
}
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- writeAndFlushNextEnvelopeIfPossible();
- }
-
public int getNumQueuedEnvelopes() {
- synchronized (channel) {
- return queuedEnvelopes.size();
- }
+ return this.numQueuedEnvelopes.intValue();
}
@Override
@@ -162,48 +98,16 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
}
private void writeAndFlushNextEnvelopeIfPossible() {
- Envelope nextEnvelope = null;
-
- synchronized (channel) {
- if (channel.isWritable() && !queuedEnvelopes.isEmpty()) {
- nextEnvelope = queuedEnvelopes.poll();
- }
- }
-
- if (nextEnvelope != null) {
- channel.writeAndFlush(nextEnvelope).addListener(writeListener);
- }
- }
-
- private class ChannelWriteListener implements ChannelFutureListener {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- writeAndFlushNextEnvelopeIfPossible();
- }
- else {
- exceptionOccurred(future.cause() == null
- ? new Exception("Envelope send aborted.")
- : future.cause());
- }
- }
- }
-
- private class ChannelCloseListener implements ChannelFutureListener {
+ if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
+ Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
+ this.numQueuedEnvelopes.decrementAndGet();
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- exceptionOccurred(future.cause() == null
- ? new Exception("Close failed.")
- : future.cause());
- }
+ this.channel.writeAndFlush(nextEnvelope).addListener(this);
}
}
private void exceptionOccurred(Throwable t) throws Exception {
- LOG.error(String.format("Exception in Channel %s: %s", channel, t.getMessage()));
+ LOG.error("An exception occurred in Channel {}: {}", channel, t.getMessage());
throw new Exception(t);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 4525649..9aa39d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+
package org.apache.flink.runtime.taskmanager;
import java.io.File;
@@ -317,13 +318,17 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
- int closeAfterIdleForMs = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS);
+ int lowWaterMark = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
+
+ int highWaterMark = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
networkConnectionManager = new NettyConnectionManager(
localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
- bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
+ bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index cfb6ae5..dfb654d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -16,16 +16,18 @@
* limitations under the License.
*/
+
package org.apache.flink.runtime.io.network.netty;
+import org.junit.Assert;
+
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Assert;
import org.mockito.Matchers;
-import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -36,206 +38,158 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
public class NettyConnectionManagerTest {
private final static long RANDOM_SEED = 520346508276087l;
- private Random rand = new Random(RANDOM_SEED);
-
- private NettyConnectionManager senderManager;
-
- private NettyConnectionManager receiverManager;
+ private final static Random random = new Random(RANDOM_SEED);
- private ChannelManager channelManager;
+ private final static int BIND_PORT = 20000;
- private RemoteReceiver[] receivers;
+ private final static int BUFFER_SIZE = 32 * 1024;
- private CountDownLatch receivedAllEnvelopesLatch;
-
- private void initTest(
- int numProducers,
- final int numEnvelopesPerProducer,
- int numInThreads,
- int numOutThreads,
- int closeAfterIdleForMs) throws Exception {
-
- final InetAddress bindAddress = InetAddress.getLocalHost();
- final int bindPort = 20000;
- final int bufferSize = 32 * 1024;
+ public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
+ Integer[][] configs = new Integer[][]{
+ {64, 4096, 1, 1, 1},
+ {128, 2048, 1, 1, 1},
+ {256, 1024, 1, 1, 1},
+ {512, 512, 1, 1, 1},
+ {64, 4096, 4, 1, 1},
+ {128, 2048, 4, 1, 1},
+ {256, 1024, 4, 1, 1},
+ {512, 512, 4, 1, 1},
+ {64, 4096, 4, 2, 2},
+ {128, 2048, 4, 2, 2},
+ {256, 1024, 4, 2, 2},
+ {512, 512, 4, 2, 2}
+ };
- senderManager = Mockito.spy(new NettyConnectionManager(
- bindAddress, bindPort, bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs));
+ for (Integer[] params : configs) {
+ System.out.println(String.format("Running %s with config: %d sub tasks, %d envelopes to send per subtasks, "
+ + "%d num channels, %d num in threads, %d num out threads.",
+ "testEnqueueRaceAndDeadlockFreeMultipleChannels", params[0], params[1], params[2], params[3], params[4]));
- receiverManager = new NettyConnectionManager(
- bindAddress, bindPort + 1, bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
+ long start = System.currentTimeMillis();
+ doTestEnqueueRaceAndDeadlockFreeMultipleChannels(params[0], params[1], params[2], params[3], params[4]);
+ long end = System.currentTimeMillis();
- channelManager = Mockito.mock(ChannelManager.class);
+ System.out.println(String.format("Runtime: %d ms.", (end - start)));
+ }
+ }
- senderManager.start(channelManager);
- receiverManager.start(channelManager);
+ private void doTestEnqueueRaceAndDeadlockFreeMultipleChannels(
+ int numSubtasks, final int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads)
+ throws Exception {
- receivers = new RemoteReceiver[numProducers];
- for (int i = 0; i < numProducers; i++) {
- receivers[i] = new RemoteReceiver(new InetSocketAddress(bindPort + 1), i);
- }
+ final InetAddress localhost = InetAddress.getLocalHost();
+ final CountDownLatch latch = new CountDownLatch(numSubtasks);
// --------------------------------------------------------------------
+ // setup
+ // --------------------------------------------------------------------
+ ChannelManager channelManager = mock(ChannelManager.class);
+ doAnswer(new VerifyEnvelopes(latch, numToSendPerSubtask))
+ .when(channelManager).dispatchFromNetwork(Matchers.<Envelope>anyObject());
- receivedAllEnvelopesLatch = new CountDownLatch(numProducers);
-
- final ConcurrentMap<ChannelID, Integer> receivedSequenceNums =
- new ConcurrentHashMap<ChannelID, Integer>();
+ final NettyConnectionManager senderConnManager = new NettyConnectionManager(localhost, BIND_PORT, BUFFER_SIZE,
+ numInThreads, numOutThreads, -1, -1);
+ senderConnManager.start(channelManager);
- // Verifies that the sequence numbers of each producer are received
- // in ascending incremental order. In addition, manages a latch to
- // allow synchronization after all envelopes have been received.
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- final Envelope env = (Envelope) invocation.getArguments()[0];
+ NettyConnectionManager receiverConnManager = new NettyConnectionManager(localhost, BIND_PORT + 1, BUFFER_SIZE,
+ numInThreads, numOutThreads, -1, -1);
+ receiverConnManager.start(channelManager);
- final int currentSeqNum = env.getSequenceNumber();
- final ChannelID cid = env.getSource();
+ // --------------------------------------------------------------------
+ // start sender threads
+ // --------------------------------------------------------------------
+ RemoteReceiver[] receivers = new RemoteReceiver[numChannels];
- if (currentSeqNum < 0 || currentSeqNum >= numEnvelopesPerProducer) {
- Assert.fail("Received more envelopes than expected from " + cid);
- }
+ for (int i = 0; i < numChannels; i++) {
+ receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
+ }
- Integer previousSeqNum = receivedSequenceNums.put(cid, currentSeqNum);
+ for (int i = 0; i < numSubtasks; i++) {
+ final RemoteReceiver receiver = receivers[random.nextInt(numChannels)];
- if (previousSeqNum != null) {
- String errMsg = String.format("Received %s with unexpected sequence number.", env);
- Assert.assertEquals(errMsg, previousSeqNum + 1, currentSeqNum);
- }
+ final AtomicInteger seqNum = new AtomicInteger(0);
+ final JobID jobId = new JobID();
+ final ChannelID channelId = new ChannelID();
- if (currentSeqNum == numEnvelopesPerProducer - 1) {
- receivedAllEnvelopesLatch.countDown();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // enqueue envelopes with ascending seq numbers
+ while (seqNum.get() < numToSendPerSubtask) {
+ try {
+ Envelope env = new Envelope(seqNum.getAndIncrement(), jobId, channelId);
+ senderConnManager.enqueue(env, receiver);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception while enqueuing envelope.");
+ }
+ }
}
+ }).start();
+ }
- return null;
- }
- }).when(channelManager).dispatchFromNetwork(Matchers.any(Envelope.class));
- }
+ latch.await();
- private void finishTest() throws IOException {
- senderManager.shutdown();
- receiverManager.shutdown();
+ senderConnManager.shutdown();
+ receiverConnManager.shutdown();
}
- // ------------------------------------------------------------------------
+ /**
+ * Verifies correct ordering of received envelopes (per envelope source channel ID).
+ */
+ private class VerifyEnvelopes implements Answer<Void> {
- // Verifies that there are no race conditions or dead locks with
- // concurrent enqueues and closes.
- public void testConcurrentEnqueueAndClose() throws Exception {
- Integer[][] configs = new Integer[][]{
- // No close after idle
- {64, 4096, 1, 1, 1, 0, 0},
- {128, 2048, 1, 1, 1, 0, 0},
- {256, 1024, 1, 1, 1, 0, 0},
- {512, 512, 1, 1, 1, 0, 0},
-
- {64, 4096, 4, 1, 1, 0, 0},
- {128, 2048, 4, 1, 1, 0, 0},
- {256, 1024, 4, 1, 1, 0, 0},
- {512, 512, 4, 1, 1, 0, 0},
-
- {64, 4096, 4, 2, 2, 0, 0},
- {128, 2048, 4, 2, 2, 0, 0},
- {256, 1024, 4, 2, 2, 0, 0},
- {512, 512, 4, 2, 2, 0, 0},
- // Note: these need plenty of heap space for the threads
- {1024, 256, 4, 2, 2, 0, 0},
- {2048, 128, 4, 2, 2, 0, 0},
- {4096, 64, 4, 2, 2, 0, 0},
-
- // With close after idle
- {4, 1024, 1, 1, 1, 40, 80},
- {8, 1024, 1, 1, 1, 40, 80},
- {16, 1024, 1, 1, 1, 40, 80},
- {32, 1024, 1, 1, 1, 40, 80},
- {64, 1024, 1, 1, 1, 40, 80},
-
- {16, 1024, 4, 1, 1, 40, 80},
- {32, 1024, 4, 1, 1, 40, 80},
- {64, 1024, 4, 1, 1, 40, 80},
-
- {16, 1024, 4, 2, 2, 40, 80},
- {32, 1024, 4, 2, 2, 40, 80},
- {64, 1024, 4, 2, 2, 40, 80}
- };
+ private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
- for (Integer[] params : configs) {
- System.out.println(String.format("Running testConcurrentEnqueueAndClose with config: " +
- "%d producers, %d envelopes per producer, %d num channels, " +
- "%d num in threads, %d num out threads, " +
- "%d ms min sleep time, %d ms max sleep time.",
- params[0], params[1], params[2], params[3], params[4], params[5], params[6]));
+ private final CountDownLatch latch;
- long start = System.currentTimeMillis();
- doTestConcurrentEnqueueAndClose(params[0], params[1], params[2], params[3], params[4], params[5], params[6]);
- long end = System.currentTimeMillis();
+ private final int numExpectedEnvelopesPerSubtask;
- System.out.println(String.format("Runtime: %d ms.", (end - start)));
+ private VerifyEnvelopes(CountDownLatch latch, int numExpectedEnvelopesPerSubtask) {
+ this.latch = latch;
+ this.numExpectedEnvelopesPerSubtask = numExpectedEnvelopesPerSubtask;
}
- }
-
- private void doTestConcurrentEnqueueAndClose(
- int numProducers,
- final int numEnvelopesPerProducer,
- final int numChannels,
- int numInThreads,
- int numOutThreads,
- final int minSleepTimeMs,
- final int maxSleepTimeMs) throws Exception {
-
- // The idle time before a close is requested is 1/4th of the min sleep
- // time of each producer. Depending on the number of concurrent producers
- // and number of envelopes to send per producer, this will result in a
- // variable number of close requests.
- initTest(numProducers, numEnvelopesPerProducer, numInThreads, numOutThreads, minSleepTimeMs / 4);
-
- // --------------------------------------------------------------------
- Runnable[] producers = new Runnable[numProducers];
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Envelope env = (Envelope) invocation.getArguments()[0];
- for (int i = 0; i < numProducers; i++) {
- producers[i] = new Runnable() {
- @Override
- public void run() {
- final JobID jid = new JobID();
- final ChannelID cid = new ChannelID();
- final RemoteReceiver receiver = receivers[rand.nextInt(numChannels)];
+ ChannelID channelId = env.getSource();
+ int seqNum = env.getSequenceNumber();
- for (int sequenceNum = 0; sequenceNum < numEnvelopesPerProducer; sequenceNum++) {
- try {
- senderManager.enqueue(new Envelope(sequenceNum, jid, cid), receiver);
+ if (seqNum == 0) {
+ Integer previousSeqNum = this.received.putIfAbsent(channelId, seqNum);
- int sleepTime = rand.nextInt((maxSleepTimeMs - minSleepTimeMs) + 1) + minSleepTimeMs;
- Thread.sleep(sleepTime);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- };
- }
+ String msg = String.format("Received envelope from %s before, but current seq num is 0", channelId);
+ Assert.assertNull(msg, previousSeqNum);
+ }
+ else {
+ boolean isExpectedPreviousSeqNum = this.received.replace(channelId, seqNum - 1, seqNum);
- for (int i = 0; i < numProducers; i++) {
- new Thread(producers[i], "Producer " + i).start();
- }
+ String msg = String.format("Received seq num %d from %s, but previous was not %d.",
+ seqNum, channelId, seqNum - 1);
+ Assert.assertTrue(msg, isExpectedPreviousSeqNum);
+ }
- // --------------------------------------------------------------------
+ // count down the latch if all envelopes received for this source
+ if (seqNum == numExpectedEnvelopesPerSubtask - 1) {
+ this.latch.countDown();
+ }
- while (receivedAllEnvelopesLatch.getCount() != 0) {
- receivedAllEnvelopesLatch.await();
+ return null;
}
-
- finishTest();
}
private void runAllTests() throws Exception {
- testConcurrentEnqueueAndClose();
+ testEnqueueRaceAndDeadlockFreeMultipleChannels();
System.out.println("Done.");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
deleted file mode 100644
index e2f3a14..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.timeout.IdleStateEvent;
-import org.apache.flink.runtime.io.network.Envelope;
-import org.apache.flink.runtime.io.network.NetworkConnectionManager;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.junit.Assert;
-import org.mockito.Mockito;
-import org.powermock.reflect.Whitebox;
-
-import java.net.InetAddress;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-
-public class OutboundConnectionQueueTest {
-
- private final static long RANDOM_SEED = 520346508276087l;
-
- private final Object lock = new Object();
-
- private Channel channel;
-
- private NetworkConnectionManager connectionManager;
-
- private RemoteReceiver receiver;
-
- private OutboundConnectionQueue queue;
-
- private TestControlHandler controller;
-
- private TestVerificationHandler verifier;
-
- private Throwable exception;
-
- private void initTest(boolean autoTriggerWrite) {
- controller = new TestControlHandler(autoTriggerWrite);
- verifier = new TestVerificationHandler();
-
- channel = Mockito.spy(new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- exception = cause;
- super.exceptionCaught(ctx, cause);
- }
- }));
-
- connectionManager = Mockito.mock(NetworkConnectionManager.class);
-
- receiver = Mockito.mock(RemoteReceiver.class);
-
- queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
-
- channel.pipeline().addFirst("Test Control Handler", controller);
- channel.pipeline().addFirst("Test Verification Handler", verifier);
-
- exception = null;
-
- // The testing pipeline looks as follows:
- // - Test Verification Handler [OUT]
- // - Test Control Handler [IN]
- // - Idle State Handler [IN/OUT] [added by OutboundConnectionQueue]
- // - Outbound queue (SUT) [IN] [added by OutboundConnectionQueue]
- // - Exception setter [IN] [EmbeddedChannel constructor]
- }
-
- /**
- * Verifies that the channel is closed after an idle event, when
- * there are no queued envelopes.
- */
- public void testClose() throws Exception {
- initTest(false);
-
- JobID jid = new JobID();
- ChannelID cid = new ChannelID();
-
- Assert.assertTrue(queue.enqueue(new Envelope(1, jid, cid)));
- Assert.assertTrue(queue.enqueue(new Envelope(2, jid, cid)));
- Assert.assertTrue(queue.enqueue(new Envelope(3, jid, cid)));
-
- controller.triggerWrite();
-
- controller.fireIdle();
-
- verifier.waitForClose();
-
- verifier.verifyEnvelopeReceived(cid, 3);
-
- Mockito.verify(connectionManager, Mockito.times(1)).close(Mockito.any(RemoteReceiver.class));
- }
-
- /**
- * Verifies that the channel is not closed while there are queued
- * envelopes.
- */
- public void testCloseWithQueuedEnvelopes() throws Exception {
- initTest(true);
-
- final JobID jid = new JobID();
- final ChannelID cid = new ChannelID();
- final CountDownLatch sync = verifier.waitForEnvelopes(3, cid);
-
- // Make channel not writable => envelopes are queued
- Mockito.when(channel.isWritable()).thenReturn(false);
-
- Assert.assertTrue(queue.enqueue(new Envelope(1, jid, cid)));
- Assert.assertTrue(queue.enqueue(new Envelope(2, jid, cid)));
- Assert.assertTrue(queue.enqueue(new Envelope(3, jid, cid)));
-
- // Verify idle event doesn't close channel
- controller.fireIdle();
-
- Mockito.verify(connectionManager, Mockito.times(0)).close(Mockito.any(RemoteReceiver.class));
-
- Boolean hasRequestedClose = Whitebox.<Boolean>getInternalState(queue, "hasRequestedClose");
- Assert.assertFalse("Close request while envelope in flight.", hasRequestedClose);
-
- // Change writability of channel back to writable
- Mockito.when(channel.isWritable()).thenReturn(true);
- channel.pipeline().fireChannelWritabilityChanged();
-
- // Wait for the processing of queued envelopes
- while (sync.getCount() != 0) {
- sync.await();
- }
-
- verifier.verifyEnvelopeReceived(cid, 3);
-
- // Now close again
- controller.fireIdle();
- verifier.waitForClose();
-
- Mockito.verify(connectionManager, Mockito.times(1)).close(Mockito.any(RemoteReceiver.class));
- }
-
- /**
- * Verifies that envelopes are delegated back to the connection
- * manager after a close.
- */
- public void testEnqueueAfterClose() throws Exception {
- initTest(true);
-
- // Immediately close the channel
- controller.fireIdle();
- verifier.waitForClose();
-
- Assert.assertFalse(queue.enqueue(new Envelope(1, new JobID(), new ChannelID())));
- }
-
- /**
- * Verifies that multiple idle events are handled correctly.
- */
- public void testMultipleIdleEvents() throws Exception {
- initTest(true);
-
- controller.fireIdle();
- verifier.waitForClose();
-
- controller.fireIdle();
-
- // Second close should not cause an exception in the
- // verification handler.
- Assert.assertNull(exception);
- }
-
- /**
- * Verifies that unknown user events throw an exception.
- */
- public void testUnknownUserEvent() throws Exception {
- initTest(true);
-
- Assert.assertNull(exception);
-
- controller.context.fireUserEventTriggered("Unknown user event");
-
- Assert.assertNotNull(exception);
- Assert.assertTrue(exception instanceof IllegalStateException);
- }
-
- // ------------------------------------------------------------------------
-
- public void testConcurrentEnqueueAndClose() throws Exception {
- Integer[][] configs = new Integer[][]{
- {1, 512, 0, 0},
- {1, 512, 40, 80},
- {2, 512, 40, 80},
- {4, 512, 40, 80},
- {8, 512, 40, 80},
- {32, 512, 40, 80},
- {128, 512, 40, 80},
- {256, 512, 40, 80},
- {512, 512, 40, 80}
- };
-
- for (Integer[] params : configs) {
- System.out.println(String.format(
- "Running %s with config: %d producers, %d envelopes to send per producer, " +
- "%d ms min sleep time, %d ms max sleep time.", "testConcurrentEnqueueAndClose",
- params[0], params[1], params[2], params[3]));
-
- long start = System.currentTimeMillis();
- doTestConcurrentEnqueueAndClose(params[0], params[1], params[2], params[3]);
- long end = System.currentTimeMillis();
-
- System.out.println(String.format("Runtime: %d ms.", (end - start)));
- }
- }
-
- /**
- * Verifies that concurrent enqueue and close events are handled
- * correctly.
- */
- private void doTestConcurrentEnqueueAndClose(
- final int numProducers,
- final int numEnvelopesPerProducer,
- final int minSleepTimeMs,
- final int maxSleepTimeMs) throws Exception {
-
- final InetAddress bindHost = InetAddress.getLocalHost();
- final int bindPort = 20000;
-
- // Testing concurrent enqueue and close requires real TCP channels,
- // because Netty's testing EmbeddedChannel does not implement the
- // same threading model as the NioEventLoopGroup (for example there
- // is no difference between being IN and OUTSIDE of the event loop
- // thread).
-
- final ServerBootstrap in = new ServerBootstrap();
- in.group(new NioEventLoopGroup(1))
- .channel(NioServerSocketChannel.class)
- .localAddress(bindHost, bindPort)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast(new ChannelInboundHandlerAdapter());
- }
- });
-
- final Bootstrap out = new Bootstrap();
- out.group(new NioEventLoopGroup(1))
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast(new ChannelOutboundHandlerAdapter());
- }
- })
- .option(ChannelOption.TCP_NODELAY, false)
- .option(ChannelOption.SO_KEEPALIVE, true);
-
- in.bind().sync();
-
- // --------------------------------------------------------------------
-
- // The testing pipeline looks as follows:
- // - Test Verification Handler [OUT]
- // - Test Control Handler [IN]
- // - Idle State Handler [IN/OUT] [added by OutboundConnectionQueue]
- // - Outbound queue (SUT) [IN] [added by OutboundConnectionQueue]
-
- channel = out.connect(bindHost, bindPort).sync().channel();
-
- queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
-
- controller = new TestControlHandler(true);
- verifier = new TestVerificationHandler();
-
- channel.pipeline().addFirst("Test Control Handler", controller);
- channel.pipeline().addFirst("Test Verification Handler", verifier);
-
- // --------------------------------------------------------------------
-
- final Random rand = new Random(RANDOM_SEED);
-
- // Every producer works on their local reference of the queue and only
- // updates it to the new channel when enqueue returns false, which
- // should only happen if the channel has been closed.
- final ConcurrentMap<ChannelID, OutboundConnectionQueue> producerQueues =
- new ConcurrentHashMap<ChannelID, OutboundConnectionQueue>();
-
- final ChannelID[] ids = new ChannelID[numProducers];
-
- for (int i = 0; i < numProducers; i++) {
- ids[i] = new ChannelID();
-
- producerQueues.put(ids[i], queue);
- }
-
- final CountDownLatch receivedAllEnvelopesLatch =
- verifier.waitForEnvelopes(numEnvelopesPerProducer - 1, ids);
-
- final List<Channel> closedChannels = new ArrayList<Channel>();
-
- // --------------------------------------------------------------------
-
- final Runnable closer = new Runnable() {
- @Override
- public void run() {
- while (receivedAllEnvelopesLatch.getCount() != 0) {
- try {
- controller.fireIdle();
-
- // Test two idle events arriving "closely"
- // after each other
- if (rand.nextBoolean()) {
- controller.fireIdle();
- }
-
- Thread.sleep(minSleepTimeMs / 2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- };
-
- final Runnable[] producers = new Runnable[numProducers];
-
- for (int i = 0; i < numProducers; i++) {
- final int index = i;
-
- producers[i] = new Runnable() {
- @Override
- public void run() {
- final JobID jid = new JobID();
- final ChannelID cid = ids[index];
-
- for (int j = 0; j < numEnvelopesPerProducer; j++) {
- OutboundConnectionQueue localQueue = producerQueues.get(cid);
-
- try {
- // This code path is handled by the NetworkConnectionManager
- // in production to enqueue the envelope either to the current
- // channel or a new one if it was closed.
- while (!localQueue.enqueue(new Envelope(j, jid, cid))) {
- synchronized (lock) {
- if (localQueue == queue) {
- closedChannels.add(channel);
-
- channel = out.connect(bindHost, bindPort).sync().channel();
-
- queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
-
- channel.pipeline().addFirst("Test Control Handler", controller);
- channel.pipeline().addFirst("Test Verification Handler", verifier);
- }
- }
-
- producerQueues.put(cid, queue);
- localQueue = queue;
- }
-
- int sleepTime = rand.nextInt((maxSleepTimeMs - minSleepTimeMs) + 1) + minSleepTimeMs;
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- };
- }
-
- for (int i = 0; i < numProducers; i++) {
- new Thread(producers[i], "Producer " + i).start();
- }
-
- new Thread(closer, "Closer").start();
-
- // --------------------------------------------------------------------
-
- while (receivedAllEnvelopesLatch.getCount() != 0) {
- receivedAllEnvelopesLatch.await();
- }
-
- // Final close, if the last close didn't make it.
- synchronized (lock) {
- if (channel != null) {
- controller.fireIdle();
- }
- }
-
- verifier.waitForClose();
-
- // If the producers do not sleep after each envelope, the close
- // should not make it through and no channel should have been
- // added to the list of closed channels
- if (minSleepTimeMs == 0 && maxSleepTimeMs == 0) {
- Assert.assertEquals(0, closedChannels.size());
- }
-
- for (Channel ch : closedChannels) {
- Assert.assertFalse(ch.isOpen());
- }
-
- System.out.println(closedChannels.size() + " channels were closed during execution.");
-
- out.group().shutdownGracefully().sync();
- in.group().shutdownGracefully().sync();
- }
-
- // ------------------------------------------------------------------------
-
- // Handler to control the flow of Netty's custom user events. Used
- // to fire idle events and manually trigger write events.
- @ChannelHandler.Sharable
- private static class TestControlHandler extends ChannelInboundHandlerAdapter {
-
- private final Queue<Object> events = new ArrayDeque<Object>();
-
- private final boolean forwardUserEvents;
-
- private ChannelHandlerContext context;
-
- private TestControlHandler(boolean forwardUserEvents) {
- this.forwardUserEvents = forwardUserEvents;
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- synchronized (this) {
- context = ctx;
- }
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (forwardUserEvents) {
- ctx.fireUserEventTriggered(evt);
- }
- else {
- events.add(evt);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- super.exceptionCaught(ctx, cause);
- }
-
- public void triggerWrite() {
- synchronized (this) {
- if (!events.isEmpty()) {
- context.fireUserEventTriggered(events.remove());
- }
- }
- }
-
- public void fireIdle() {
- synchronized (this) {
- context.fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT);
- }
- }
- }
-
- // Handler to verify writes and close events.
- @ChannelHandler.Sharable
- private static class TestVerificationHandler extends ChannelOutboundHandlerAdapter {
-
- private final Map<ChannelID, Integer> receivedSequenceNums = new HashMap<ChannelID, Integer>();
-
- private final Map<ChannelID, Integer> expectedSequenceNums = new HashMap<ChannelID, Integer>();
-
- private final Map<ChannelID, CountDownLatch> envelopeLatches = new HashMap<ChannelID, CountDownLatch>();
-
- private CountDownLatch closeLatch;
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- closeLatch = new CountDownLatch(1);
-
- super.handlerAdded(ctx);
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (closeLatch.getCount() == 0) {
- throw new IllegalStateException("Write on closed channel.");
- }
-
- if (msg.getClass() == Envelope.class) {
- Envelope env = (Envelope) msg;
-
- final int currentSeqNum = env.getSequenceNumber();
- final ChannelID source = env.getSource();
-
- Integer previousSeqNum = receivedSequenceNums.put(source, currentSeqNum);
-
- if (previousSeqNum != null) {
- String errMsg = String.format("Received %s with unexpected sequence number.", env);
- Assert.assertEquals(errMsg, previousSeqNum + 1, currentSeqNum);
- }
-
- promise.setSuccess();
-
- Integer expectedSeqNum = expectedSequenceNums.get(source);
- if (expectedSeqNum != null && expectedSeqNum.equals(currentSeqNum)) {
- envelopeLatches.remove(source).countDown();
- }
- }
- }
-
- @Override
- public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
- if (closeLatch.getCount() == 0) {
- throw new IllegalStateException("Received multiple close events.");
- }
-
- super.close(ctx, promise);
-
- closeLatch.countDown();
- }
-
- public void waitForClose() throws InterruptedException {
- while (closeLatch.getCount() != 0) {
- closeLatch.await();
- }
- }
-
- /**
- * Verifies that envelope with expected sequence number has been
- * processed by the handler (or no envelope has been received by
- * the given source, if expected sequence number is null).
- */
- public void verifyEnvelopeReceived(ChannelID source, Integer expectedSequenceNum) {
- if (expectedSequenceNum == null && receivedSequenceNums.containsKey(source)) {
- Assert.fail("Received unexpected envelope from channel " + source);
- }
- else if (receivedSequenceNums.containsKey(source)) {
- Assert.assertEquals(expectedSequenceNum, receivedSequenceNums.get(source));
- }
- else {
- Assert.fail("Did not receive any envelope from channel " + source);
- }
- }
-
- public CountDownLatch waitForEnvelopes(Integer expectedSequenceNum, ChannelID... ids) {
- CountDownLatch latch = new CountDownLatch(ids.length);
-
- for (ChannelID id : ids) {
- expectedSequenceNums.put(id, expectedSequenceNum);
- envelopeLatches.put(id, latch);
- }
-
- return latch;
- }
- }
-
- // --------------------------------------------------------------------
-
- public void runAllTests() throws Exception {
- testClose();
- testCloseWithQueuedEnvelopes();
- testEnqueueAfterClose();
- testUnknownUserEvent();
- testMultipleIdleEvents();
- testConcurrentEnqueueAndClose();
- }
-
- public static void main(String[] args) throws Exception {
- new OutboundConnectionQueueTest().runAllTests();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c0c2abda/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index 953b5b7..7424de6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.test.runtime;
import java.io.IOException;