You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:31:01 UTC
[26/30] git commit: Add debug log messages to Netty stack
Add debug log messages to Netty stack
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6d8fea96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6d8fea96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6d8fea96
Branch: refs/heads/master
Commit: 6d8fea9602c7cd248ac29d9d5c2c2f1ade42c73f
Parents: 4cd4a13
Author: uce <u....@fu-berlin.de>
Authored: Mon Jun 2 15:46:07 2014 +0200
Committer: StephanEwen <st...@tu-berlin.de>
Committed: Sat Jun 7 09:41:21 2014 +0200
----------------------------------------------------------------------
.../network/netty/InboundEnvelopeDecoder.java | 11 +++++
.../network/netty/NettyConnectionManager.java | 52 ++++++++++++++++++++
.../network/netty/OutboundConnectionQueue.java | 17 ++++++-
.../test/runtime/NetworkStackNepheleITCase.java | 6 +--
4 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6d8fea96/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
index 1ab1871..54f4617 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
@@ -23,6 +23,8 @@ import eu.stratosphere.runtime.io.network.Envelope;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -30,6 +32,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
+ private static final Log LOG = LogFactory.getLog(InboundEnvelopeDecoder.class);
+
private final BufferProviderBroker bufferProviderBroker;
private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask();
@@ -123,6 +127,10 @@ public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter impleme
case REGISTERED:
if (ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Set channel %s auto read to false.", ctx.channel()));
+ }
}
this.stagedBuffer = in;
@@ -194,6 +202,9 @@ public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter impleme
if (decodeBuffer(stagedBuffer, channelHandlerContext)) {
stagedBuffer = null;
channelHandlerContext.channel().config().setAutoRead(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Set channel %s auto read to true.", channelHandlerContext.channel()));
+ }
}
} catch (IOException e) {
availableBuffer.recycleBuffer();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6d8fea96/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
index ae67f42..4e3f043 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -34,6 +34,8 @@ import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Date;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -41,6 +43,8 @@ public class NettyConnectionManager {
private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class);
+ private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
+
private final ChannelManager channelManager;
private final ServerBootstrap in;
@@ -108,6 +112,30 @@ public class NettyConnectionManager {
} catch (InterruptedException e) {
throw new RuntimeException("Could not bind server socket for incoming connections.");
}
+
+ if (LOG.isDebugEnabled()) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ Date date = new Date();
+
+
+ while (true) {
+ try {
+ Thread.sleep(DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS);
+
+ date.setTime(System.currentTimeMillis());
+ System.out.println(date);
+
+ System.out.println(getNonZeroNumQueuedEnvelopes());
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }).start();
+ }
}
public void shutdown() {
@@ -170,6 +198,30 @@ public class NettyConnectionManager {
channel.enqueue(envelope);
}
+ private String getNonZeroNumQueuedEnvelopes() {
+ StringBuilder str = new StringBuilder();
+
+ str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
+
+ for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
+ RemoteReceiver receiver = entry.getKey();
+
+ Object value = entry.getValue();
+ if (value instanceof OutboundConnectionQueue) {
+ OutboundConnectionQueue queue = (OutboundConnectionQueue) value;
+ if (queue.getNumQueuedEnvelopes() > 0) {
+ str.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n",
+ Thread.currentThread().getId(), receiver, queue.getChannel(), queue.getNumQueuedEnvelopes()));
+ }
+ } else if (value instanceof ChannelInBuildup) {
+ str.append(String.format("%s> Connection to %s is still in buildup\n",
+ Thread.currentThread().getId(), receiver));
+ }
+ }
+
+ return str.toString();
+ }
+
// ------------------------------------------------------------------------
private static final class ChannelInBuildup implements ChannelFutureListener {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6d8fea96/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
index c687408..b6ec915 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicInteger;
public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
@@ -30,11 +31,12 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
private final Channel channel;
- private final ArrayDeque<Envelope> queuedEnvelopes;
+ private final ArrayDeque<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+
+ private final AtomicInteger numQueued = new AtomicInteger(0);
public OutboundConnectionQueue(Channel channel) {
this.channel = channel;
- this.queuedEnvelopes = new ArrayDeque<Envelope>();
channel.pipeline().addFirst(this);
}
@@ -56,6 +58,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
boolean triggerWrite = this.queuedEnvelopes.isEmpty();
this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
+ this.numQueued.incrementAndGet();
if (triggerWrite) {
writeAndFlushNextEnvelopeIfPossible();
@@ -80,9 +83,19 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
}
}
+ public int getNumQueuedEnvelopes() {
+ return this.numQueued.intValue();
+ }
+
+ public Channel getChannel() {
+ return this.channel;
+ }
+
private void writeAndFlushNextEnvelopeIfPossible() {
if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
+ this.numQueued.decrementAndGet();
+
this.channel.writeAndFlush(nextEnvelope).addListener(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6d8fea96/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
index f5beda4..64026a2 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackNepheleITCase.java
@@ -162,8 +162,8 @@ public class NetworkStackNepheleITCase extends RecordAPITestBase {
producer.connectTo(forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
forwarder.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- producer.setVertexToShareInstancesWith(forwarder);
- forwarder.setVertexToShareInstancesWith(consumer);
+ forwarder.setVertexToShareInstancesWith(producer);
+ consumer.setVertexToShareInstancesWith(producer);
}
else {
producer.connectTo(consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -196,7 +196,7 @@ public class NetworkStackNepheleITCase extends RecordAPITestBase {
LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)",
getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks(), numRecordsToEmit,
- SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask/1024.0));
+ SpeedTestRecord.RECORD_SIZE, dataMbPerSubtask / 1024.0));
boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);