You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:28 UTC

[31/34] git commit: Add debug log messages to Netty stack

Add debug log messages to Netty stack


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7509aaff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7509aaff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7509aaff

Branch: refs/heads/release-0.5.1
Commit: 7509aafff1378cf30779be226bb0c8af6da14724
Parents: b01038d
Author: uce <u....@fu-berlin.de>
Authored: Mon Jun 2 15:46:07 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Jun 10 21:26:35 2014 +0200

----------------------------------------------------------------------
 .../network/netty/InboundEnvelopeDecoder.java   | 11 +++++
 .../network/netty/NettyConnectionManager.java   | 52 ++++++++++++++++++++
 .../network/netty/OutboundConnectionQueue.java  | 17 ++++++-
 .../test/runtime/NetworkStackNepheleITCase.java |  6 +--
 4 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7509aaff/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
index 1ab1871..54f4617 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
@@ -23,6 +23,8 @@ import eu.stratosphere.runtime.io.network.Envelope;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -30,6 +32,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
 
+	private static final Log LOG = LogFactory.getLog(InboundEnvelopeDecoder.class);
+
 	private final BufferProviderBroker bufferProviderBroker;
 
 	private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask();
@@ -123,6 +127,10 @@ public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter impleme
 					case REGISTERED:
 						if (ctx.channel().config().isAutoRead()) {
 							ctx.channel().config().setAutoRead(false);
+
+							if (LOG.isDebugEnabled()) {
+								LOG.debug(String.format("Set channel %s auto read to false.", ctx.channel()));
+							}
 						}
 
 						this.stagedBuffer = in;
@@ -194,6 +202,9 @@ public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter impleme
 				if (decodeBuffer(stagedBuffer, channelHandlerContext)) {
 					stagedBuffer = null;
 					channelHandlerContext.channel().config().setAutoRead(true);
+					if (LOG.isDebugEnabled()) {
+						LOG.debug(String.format("Set channel %s auto read to true.", channelHandlerContext.channel()));
+					}
 				}
 			} catch (IOException e) {
 				availableBuffer.recycleBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7509aaff/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
index ae67f42..4e3f043 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -34,6 +34,8 @@ import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Date;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -41,6 +43,8 @@ public class NettyConnectionManager {
 
 	private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class);
 
+	private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
+
 	private final ChannelManager channelManager;
 
 	private final ServerBootstrap in;
@@ -108,6 +112,30 @@ public class NettyConnectionManager {
 		} catch (InterruptedException e) {
 			throw new RuntimeException("Could not bind server socket for incoming connections.");
 		}
+
+		if (LOG.isDebugEnabled()) {
+			new Thread(new Runnable() {
+				@Override
+				public void run() {
+					Date date = new Date();
+
+
+					while (true) {
+						try {
+							Thread.sleep(DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS);
+
+							date.setTime(System.currentTimeMillis());
+							System.out.println(date);
+
+							System.out.println(getNonZeroNumQueuedEnvelopes());
+
+						} catch (InterruptedException e) {
+							e.printStackTrace();
+						}
+					}
+				}
+			}).start();
+		}
 	}
 
 	public void shutdown() {
@@ -170,6 +198,30 @@ public class NettyConnectionManager {
 		channel.enqueue(envelope);
 	}
 
+	private String getNonZeroNumQueuedEnvelopes() {
+		StringBuilder str = new StringBuilder();
+
+		str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
+
+		for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
+			RemoteReceiver receiver = entry.getKey();
+
+			Object value = entry.getValue();
+			if (value instanceof OutboundConnectionQueue) {
+				OutboundConnectionQueue queue = (OutboundConnectionQueue) value;
+				if (queue.getNumQueuedEnvelopes() > 0) {
+					str.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n",
+							Thread.currentThread().getId(), receiver, queue.getChannel(), queue.getNumQueuedEnvelopes()));
+				}
+			} else if (value instanceof ChannelInBuildup) {
+				str.append(String.format("%s> Connection to %s is still in buildup\n",
+						Thread.currentThread().getId(), receiver));
+			}
+		}
+
+		return str.toString();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class ChannelInBuildup implements ChannelFutureListener {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7509aaff/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
index c687408..b6ec915 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
 
@@ -30,11 +31,12 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 
 	private final Channel channel;
 
-	private final ArrayDeque<Envelope> queuedEnvelopes;
+	private final ArrayDeque<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+
+	private final AtomicInteger numQueued = new AtomicInteger(0);
 
 	public OutboundConnectionQueue(Channel channel) {
 		this.channel = channel;
-		this.queuedEnvelopes = new ArrayDeque<Envelope>();
 
 		channel.pipeline().addFirst(this);
 	}
@@ -56,6 +58,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 		boolean triggerWrite = this.queuedEnvelopes.isEmpty();
 
 		this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
+		this.numQueued.incrementAndGet();
 
 		if (triggerWrite) {
 			writeAndFlushNextEnvelopeIfPossible();
@@ -80,9 +83,19 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 		}
 	}
 
+	public int getNumQueuedEnvelopes() {
+		return this.numQueued.intValue();
+	}
+
+	public Channel getChannel() {
+		return this.channel;
+	}
+
 	private void writeAndFlushNextEnvelopeIfPossible() {
 		if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
 			Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
+			this.numQueued.decrementAndGet();
+
 			this.channel.writeAndFlush(nextEnvelope).addListener(this);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7509aaff/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
index f5beda4..64026a2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
@@ -162,8 +162,8 @@ public class NetworkStackNepheleITCase extends RecordAPITestBase {
 			producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 			forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 
-			producer.setVertexToShareInstancesWith(forwarder);
-			forwarder.setVertexToShareInstancesWith(consumer);
+			forwarder.setVertexToShareInstancesWith(producer);
+			consumer.setVertexToShareInstancesWith(producer);
 		}
 		else {
 			producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -196,7 +196,7 @@ public class NetworkStackNepheleITCase extends RecordAPITestBase {
 
 			LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
 					getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
-					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask/1024.0));
+					SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
 
 			boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);