You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/08/02 16:59:50 UTC
[flink] 02/02: [FLINK-10027][DataStream API] Add logging to
StreamingFileSink.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 852502b7d51f91c6f9c3479424516d0b9ae255e5
Author: kkloudas <kk...@gmail.com>
AuthorDate: Thu Aug 2 14:07:27 2018 +0200
[FLINK-10027][DataStream API] Add logging to StreamingFileSink.
This closes #6477.
---
.../api/functions/sink/filesystem/Bucket.java | 37 ++++++++++++++++++++--
.../api/functions/sink/filesystem/BucketState.java | 22 +++++++++++++
.../api/functions/sink/filesystem/Buckets.java | 30 +++++++++++++++++-
3 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index fbedaac..6187e68 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -23,6 +23,9 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,12 +38,13 @@ import java.util.Objects;
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
*
* <p>For each incoming element in the {@code StreamingFileSink}, the user-specified
- * {@link BucketAssigner Bucketer} is queried to see in which bucket this element should
- * be written to.
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
*/
@Internal
public class Bucket<IN, BucketID> {
+ private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
+
private static final String PART_PREFIX = "part";
private final BucketID bucketId;
@@ -111,6 +115,7 @@ public class Bucket<IN, BucketID> {
}
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
+
// we try to resume the previous in-progress file
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
@@ -162,10 +167,20 @@ public class Bucket<IN, BucketID> {
if (committable != null) {
pendingPartsForCurrentCheckpoint.add(committable);
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} merging buckets for bucket id={}", subtaskIndex, bucketId);
+ }
}
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
+ subtaskIndex, bucketId, element);
+ }
+
rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
@@ -173,7 +188,15 @@ public class Bucket<IN, BucketID> {
private void rollPartFile(final long currentTime) throws IOException {
closePartFile();
- inProgressPart = partFileFactory.openNew(bucketId, fsWriter, assembleNewPartPath(), currentTime);
+
+ final Path partFilePath = assembleNewPartPath();
+ inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
+ subtaskIndex, partFilePath.getName(), bucketId);
+ }
+
partCounter++;
}
@@ -213,6 +236,9 @@ public class Bucket<IN, BucketID> {
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
+ }
closePartFile();
}
@@ -242,6 +268,11 @@ public class Bucket<IN, BucketID> {
void onProcessingTime(long timestamp) throws IOException {
if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " +
+ "(in-progress file created @ {}, last updated @ {} and current time is {}).",
+ subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp);
+ }
closePartFile();
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 18ef32f9..1829381 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -96,4 +96,26 @@ class BucketState<BucketID> {
Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
return committableFilesPerCheckpoint;
}
+
+ @Override
+ public String toString() {
+ final StringBuilder strBuilder = new StringBuilder();
+
+ strBuilder
+ .append("BucketState for bucketId=").append(bucketId)
+ .append(" and bucketPath=").append(bucketPath);
+
+ if (hasInProgressResumableFile()) {
+ strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
+ }
+
+ if (!committableFilesPerCheckpoint.isEmpty()) {
+ strBuilder.append(", has pending files for checkpoints: {");
+ for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
+ strBuilder.append(checkpointId).append(' ');
+ }
+ strBuilder.append('}');
+ }
+ return strBuilder.toString();
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 405285e..2aca841 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -27,6 +27,9 @@ import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -47,6 +50,8 @@ import java.util.Map;
@Internal
public class Buckets<IN, BucketID> {
+ private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);
+
// ------------------------ configuration fields --------------------------
private final Path basePath;
@@ -102,7 +107,13 @@ public class Buckets<IN, BucketID> {
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
- this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ try {
+ this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ } catch (IOException e) {
+ LOG.error("Unable to create filesystem for path: {}", basePath);
+ throw e;
+ }
+
this.bucketStateSerializer = new BucketStateSerializer<>(
fsWriter.getResumeRecoverableSerializer(),
fsWriter.getCommitRecoverableSerializer(),
@@ -129,7 +140,11 @@ public class Buckets<IN, BucketID> {
* in-progress/pending part files
*/
void initializeState(final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception {
+
initializePartCounter(partCounterState);
+
+ LOG.info("Subtask {} initializing its state (max part counter={}).", subtaskIndex, maxPartCounter);
+
initializeActiveBuckets(bucketStates);
}
@@ -153,6 +168,10 @@ public class Buckets<IN, BucketID> {
private void handleRestoredBucketState(final BucketState<BucketID> recoveredState) throws Exception {
final BucketID bucketId = recoveredState.getBucketId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} restoring: {}", subtaskIndex, recoveredState);
+ }
+
final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
fsWriter,
@@ -179,6 +198,8 @@ public class Buckets<IN, BucketID> {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
activeBuckets.entrySet().iterator();
+ LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);
+
while (activeBucketIt.hasNext()) {
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
@@ -200,6 +221,9 @@ public class Buckets<IN, BucketID> {
fsWriter != null && bucketStateSerializer != null,
"sink has not been initialized");
+ LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
+ subtaskIndex, checkpointId, maxPartCounter);
+
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
partCounterStateContainer.add(maxPartCounter);
}
@@ -215,6 +239,10 @@ public class Buckets<IN, BucketID> {
.writeVersionAndSerialize(bucketStateSerializer, bucketState);
bucketStatesContainer.add(serializedBucketState);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
+ }
}
}