You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/13 09:57:35 UTC

[flink] branch release-1.5 updated (fa4b6e1 -> d5ddb0f)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fa4b6e1  [hotfix] [docs] Fix ProcessWindowFunction code snippets.
     new 709f2e9  [hotfix][network] add task name to SingleInputGate logs
     new 7a86b11  [hotfix][checkstyle] fix comments in SingleInputGate
     new d5ddb0f  [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/partition/consumer/InputGate.java   |  2 +
 .../partition/consumer/SingleInputGate.java        | 23 +++++---
 .../network/partition/consumer/UnionInputGate.java |  6 ++
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 64 +++++++++++++++-------
 .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ++++
 .../flink/streaming/runtime/io/MockInputGate.java  | 12 ++++
 6 files changed, 91 insertions(+), 28 deletions(-)


[flink] 02/03: [hotfix][checkstyle] fix comments in SingleInputGate

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a86b117b83438d2ceb5bafdcaa7dd17e6834fd2
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Aug 2 15:14:47 2018 +0200

    [hotfix][checkstyle] fix comments in SingleInputGate
---
 .../runtime/io/network/partition/consumer/SingleInputGate.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b6380af..dbef46f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -62,10 +62,10 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator produces data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -74,7 +74,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
  * subpartitions.
  *
@@ -95,7 +95,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
  * subpartitions -- one for each parallel reduce subtask.
  */


[flink] 01/03: [hotfix][network] add task name to SingleInputGate logs

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 709f2e93b1472f6709e81fd14d43d796da0f9969
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Jul 31 14:49:16 2018 +0200

    [hotfix][network] add task name to SingleInputGate logs
---
 .../runtime/io/network/partition/consumer/SingleInputGate.java | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 06e80ff..b6380af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -364,7 +364,7 @@ public class SingleInputGate implements InputGate {
 					throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
 				}
 
-				LOG.debug("Updated unknown input channel to {}.", newChannel);
+				LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);
 
 				inputChannels.put(partitionId, newChannel);
 
@@ -393,7 +393,7 @@ public class SingleInputGate implements InputGate {
 
 				checkNotNull(ch, "Unknown input channel with ID " + partitionId);
 
-				LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
+				LOG.debug("{}: Retriggering partition request {}:{}.", owningTaskName, ch.partitionId, consumedSubpartitionIndex);
 
 				if (ch.getClass() == RemoteInputChannel.class) {
 					final RemoteInputChannel rch = (RemoteInputChannel) ch;
@@ -432,7 +432,8 @@ public class SingleInputGate implements InputGate {
 							inputChannel.releaseAllResources();
 						}
 						catch (IOException e) {
-							LOG.warn("Error during release of channel resources: " + e.getMessage(), e);
+							LOG.warn("{}: Error during release of channel resources: {}.",
+								owningTaskName, e.getMessage(), e);
 						}
 					}
 
@@ -725,7 +726,8 @@ public class SingleInputGate implements InputGate {
 			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
 
-		LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+		LOG.debug("{}: Created {} input channels (local: {}, remote: {}, unknown: {}).",
+			owningTaskName,
 			inputChannels.length,
 			numLocalChannels,
 			numRemoteChannels,


[flink] 03/03: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5ddb0fbdc41fa8230cff7c54b993e62ddef2447
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Jul 31 14:48:58 2018 +0200

    [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name
    
    This closes #6470.
---
 .../io/network/partition/consumer/InputGate.java   |  2 +
 .../partition/consumer/SingleInputGate.java        |  5 ++
 .../network/partition/consumer/UnionInputGate.java |  6 ++
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 64 +++++++++++++++-------
 .../runtime/io/BarrierBufferMassiveRandomTest.java | 12 ++++
 .../flink/streaming/runtime/io/MockInputGate.java  | 12 ++++
 6 files changed, 81 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa..c78abb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@ public interface InputGate {
 
 	int getNumberOfInputChannels();
 
+	String getOwningTaskName();
+
 	boolean isFinished();
 
 	void requestPartitions() throws IOException, InterruptedException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index dbef46f..2e7d076 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -274,6 +274,11 @@ public class SingleInputGate implements InputGate {
 		return 0;
 	}
 
+	@Override
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	// ------------------------------------------------------------------------
 	// Setup/Life-cycle
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a..d3085cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -130,6 +130,12 @@ public class UnionInputGate implements InputGate, InputGateListener {
 	}
 
 	@Override
+	public String getOwningTaskName() {
+		// all input gates have the same owning task
+		return inputGates[0].getOwningTaskName();
+	}
+
+	@Override
 	public boolean isFinished() {
 		for (InputGate inputGate : inputGates) {
 			if (!inputGate.isFinished()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b8..991635a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	private void completeBufferedSequence() throws IOException {
-		LOG.debug("Finished feeding back buffered data");
+		LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());
 
 		currentBuffered.cleanup();
 		currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we did not complete the current checkpoint, another started before
-				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+				LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.",
+					inputGate.getOwningTaskName(),
+					barrierId,
+					currentCheckpointId);
 
 				// let the task know we are not completing this
 				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
 			// actually trigger checkpoint
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received all barriers, triggering checkpoint {} at {}",
-						receivedBarrier.getId(), receivedBarrier.getTimestamp());
+				LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
+					inputGate.getOwningTaskName(),
+					receivedBarrier.getId(),
+					receivedBarrier.getTimestamp());
 			}
 
 			releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (barrierId == currentCheckpointId) {
 				// cancel this alignment
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+					LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
+						inputGate.getOwningTaskName(),
+						barrierId);
 				}
 
 				releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			}
 			else if (barrierId > currentCheckpointId) {
 				// we canceled the next which also cancels the current
-				LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
-						"Skipping current checkpoint.", barrierId, currentCheckpointId);
+				LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+						"Skipping current checkpoint.",
+					inputGate.getOwningTaskName(),
+					barrierId,
+					currentCheckpointId);
 
 				// this stops the current alignment
 				releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			latestAlignmentDurationNanos = 0L;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+				LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
+					inputGate.getOwningTaskName(),
+					barrierId);
 			}
 
 			notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private void checkSizeLimit() throws Exception {
 		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
 			// exceeded our limit - abort this checkpoint
-			LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
-					currentCheckpointId, maxBufferedBytes);
+			LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
+				inputGate.getOwningTaskName(),
+				currentCheckpointId,
+				maxBufferedBytes);
 
 			releaseBlocksAndResetBarriers();
 			notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		startOfAlignmentTimestamp = System.nanoTime();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
+			LOG.debug("{}: Starting stream alignment for checkpoint {}.",
+				inputGate.getOwningTaskName(),
+				checkpointId);
 		}
 	}
 
@@ -470,7 +486,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			numBarriersReceived++;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received barrier from channel " + channelIndex);
+				LOG.debug("{}: Received barrier from channel {}.",
+					inputGate.getOwningTaskName(),
+					channelIndex);
 			}
 		}
 		else {
@@ -483,7 +501,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * Makes sure the just written data is the next to be consumed.
 	 */
 	private void releaseBlocksAndResetBarriers() throws IOException {
-		LOG.debug("End of stream alignment, feeding buffered data back");
+		LOG.debug("{}: End of stream alignment, feeding buffered data back.",
+			inputGate.getOwningTaskName());
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
@@ -499,8 +518,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		else {
 			// uncommon case: buffered data pending
 			// push back the pending data, if we have any
-			LOG.debug("Checkpoint skipped via buffered data:" +
-					"Pushing back current alignment buffers and feeding back new alignment data first.");
+			LOG.debug("{}: Checkpoint skipped via buffered data:" +
+					"Pushing back current alignment buffers and feeding back new alignment data first.",
+				inputGate.getOwningTaskName());
 
 			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
 			BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Size of buffered data: {} bytes",
-					currentBuffered == null ? 0L : currentBuffered.size());
+			LOG.debug("{}: Size of buffered data: {} bytes",
+				inputGate.getOwningTaskName(),
+				currentBuffered == null ? 0L : currentBuffered.size());
 		}
 
 		// the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	@Override
 	public String toString() {
-		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
-				currentCheckpointId, numBarriersReceived, numClosedChannels);
+		return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
+			inputGate.getOwningTaskName(),
+			currentCheckpointId,
+			numBarriersReceived,
+			numClosedChannels);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5e..e968101 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public class BarrierBufferMassiveRandomTest {
 		private int currentChannel = 0;
 		private long c = 0;
 
+		private final String owningTaskName;
+
 		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this(bufferPools, barrierGens, "TestTask");
+		}
+
+		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) {
 			this.numChannels = bufferPools.length;
 			this.currentBarriers = new int[numChannels];
 			this.bufferPools = bufferPools;
 			this.barrierGens = barrierGens;
+			this.owningTaskName = owningTaskName;
 		}
 
 		@Override
@@ -152,6 +159,11 @@ public class BarrierBufferMassiveRandomTest {
 		}
 
 		@Override
+		public String getOwningTaskName() {
+			return owningTaskName;
+		}
+
+		@Override
 		public boolean isFinished() {
 			return false;
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709..6400a17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@ public class MockInputGate implements InputGate {
 
 	private int closedChannels;
 
+	private final String owningTaskName;
+
 	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) {
+		this(pageSize, numChannels, bufferOrEvents, "MockTask");
+	}
+
+	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) {
 		this.pageSize = pageSize;
 		this.numChannels = numChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
 		this.closed = new boolean[numChannels];
+		this.owningTaskName = owningTaskName;
 	}
 
 	@Override
@@ -62,6 +69,11 @@ public class MockInputGate implements InputGate {
 	}
 
 	@Override
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
+	@Override
 	public boolean isFinished() {
 		return bufferOrEvents.isEmpty();
 	}