You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/08/02 16:59:50 UTC

[flink] 02/02: [FLINK-10027][DataStream API] Add logging to StreamingFileSink.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 852502b7d51f91c6f9c3479424516d0b9ae255e5
Author: kkloudas <kk...@gmail.com>
AuthorDate: Thu Aug 2 14:07:27 2018 +0200

    [FLINK-10027][DataStream API] Add logging to StreamingFileSink.
    
    This closes #6477.
---
 .../api/functions/sink/filesystem/Bucket.java      | 37 ++++++++++++++++++++--
 .../api/functions/sink/filesystem/BucketState.java | 22 +++++++++++++
 .../api/functions/sink/filesystem/Buckets.java     | 30 +++++++++++++++++-
 3 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index fbedaac..6187e68 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -23,6 +23,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,12 +38,13 @@ import java.util.Objects;
  * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
  *
  * <p>For each incoming element in the {@code StreamingFileSink}, the user-specified
- * {@link BucketAssigner Bucketer} is queried to see in which bucket this element should
- * be written to.
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
  */
 @Internal
 public class Bucket<IN, BucketID> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
+
 	private static final String PART_PREFIX = "part";
 
 	private final BucketID bucketId;
@@ -111,6 +115,7 @@ public class Bucket<IN, BucketID> {
 	}
 
 	private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
+
 		// we try to resume the previous in-progress file
 		if (state.hasInProgressResumableFile()) {
 			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
@@ -162,10 +167,20 @@ public class Bucket<IN, BucketID> {
 		if (committable != null) {
 			pendingPartsForCurrentCheckpoint.add(committable);
 		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Subtask {} merging buckets for bucket id={}", subtaskIndex, bucketId);
+		}
 	}
 
 	void write(IN element, long currentTime) throws IOException {
 		if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
+						subtaskIndex, bucketId, element);
+			}
+
 			rollPartFile(currentTime);
 		}
 		inProgressPart.write(element, currentTime);
@@ -173,7 +188,15 @@ public class Bucket<IN, BucketID> {
 
 	private void rollPartFile(final long currentTime) throws IOException {
 		closePartFile();
-		inProgressPart = partFileFactory.openNew(bucketId, fsWriter, assembleNewPartPath(), currentTime);
+
+		final Path partFilePath = assembleNewPartPath();
+		inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
+					subtaskIndex, partFilePath.getName(), bucketId);
+		}
+
 		partCounter++;
 	}
 
@@ -213,6 +236,9 @@ public class Bucket<IN, BucketID> {
 
 	private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
 		if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
+			}
 			closePartFile();
 		}
 
@@ -242,6 +268,11 @@ public class Bucket<IN, BucketID> {
 
 	void onProcessingTime(long timestamp) throws IOException {
 		if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " +
+						"(in-progress file created @ {}, last updated @ {} and current time is {}).",
+						subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp);
+			}
 			closePartFile();
 		}
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 18ef32f9..1829381 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -96,4 +96,26 @@ class BucketState<BucketID> {
 	Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
 		return committableFilesPerCheckpoint;
 	}
+
+	@Override
+	public String toString() {
+		final StringBuilder strBuilder = new StringBuilder();
+
+		strBuilder
+				.append("BucketState for bucketId=").append(bucketId)
+				.append(" and bucketPath=").append(bucketPath);
+
+		if (hasInProgressResumableFile()) {
+			strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
+		}
+
+		if (!committableFilesPerCheckpoint.isEmpty()) {
+			strBuilder.append(", has pending files for checkpoints: {");
+			for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
+				strBuilder.append(checkpointId).append(' ');
+			}
+			strBuilder.append('}');
+		}
+		return strBuilder.toString();
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 405285e..2aca841 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -27,6 +27,9 @@ import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -47,6 +50,8 @@ import java.util.Map;
 @Internal
 public class Buckets<IN, BucketID> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);
+
 	// ------------------------ configuration fields --------------------------
 
 	private final Path basePath;
@@ -102,7 +107,13 @@ public class Buckets<IN, BucketID> {
 		this.activeBuckets = new HashMap<>();
 		this.bucketerContext = new Buckets.BucketerContext();
 
-		this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+		try {
+			this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+		} catch (IOException e) {
+			LOG.error("Unable to create filesystem for path: {}", basePath);
+			throw e;
+		}
+
 		this.bucketStateSerializer = new BucketStateSerializer<>(
 				fsWriter.getResumeRecoverableSerializer(),
 				fsWriter.getCommitRecoverableSerializer(),
@@ -129,7 +140,11 @@ public class Buckets<IN, BucketID> {
 	 * in-progress/pending part files
 	 */
 	void initializeState(final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception {
+
 		initializePartCounter(partCounterState);
+
+		LOG.info("Subtask {} initializing its state (max part counter={}).", subtaskIndex, maxPartCounter);
+
 		initializeActiveBuckets(bucketStates);
 	}
 
@@ -153,6 +168,10 @@ public class Buckets<IN, BucketID> {
 	private void handleRestoredBucketState(final BucketState<BucketID> recoveredState) throws Exception {
 		final BucketID bucketId = recoveredState.getBucketId();
 
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Subtask {} restoring: {}", subtaskIndex, recoveredState);
+		}
+
 		final Bucket<IN, BucketID> restoredBucket = bucketFactory
 				.restoreBucket(
 						fsWriter,
@@ -179,6 +198,8 @@ public class Buckets<IN, BucketID> {
 		final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
 				activeBuckets.entrySet().iterator();
 
+		LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);
+
 		while (activeBucketIt.hasNext()) {
 			final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
 			bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
@@ -200,6 +221,9 @@ public class Buckets<IN, BucketID> {
 				fsWriter != null && bucketStateSerializer != null,
 				"sink has not been initialized");
 
+		LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
+				subtaskIndex, checkpointId, maxPartCounter);
+
 		snapshotActiveBuckets(checkpointId, bucketStatesContainer);
 		partCounterStateContainer.add(maxPartCounter);
 	}
@@ -215,6 +239,10 @@ public class Buckets<IN, BucketID> {
 					.writeVersionAndSerialize(bucketStateSerializer, bucketState);
 
 			bucketStatesContainer.add(serializedBucketState);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
+			}
 		}
 	}