You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/08/01 08:20:24 UTC
[flink] branch release-1.6 updated: [FLINK-10005][DataStream API]
StreamingFileSink: initPartCounter=maxUsed in new Buckets
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 0bb960e [FLINK-10005][DataStream API] StreamingFileSink: initPartCounter=maxUsed in new Buckets
0bb960e is described below
commit 0bb960e2acc1301c4fd4a23fab9c21b3df8718ae
Author: kkloudas <kk...@gmail.com>
AuthorDate: Wed Aug 1 07:59:42 2018 +0200
[FLINK-10005][DataStream API] StreamingFileSink: initPartCounter=maxUsed in new Buckets
---
.../api/functions/sink/filesystem/Bucket.java | 12 +++++
.../api/functions/sink/filesystem/Buckets.java | 17 +++---
.../filesystem/LocalStreamingFileSinkTest.java | 12 ++---
.../sink/filesystem/RollingPolicyTest.java | 61 +++++++++++++++++++++-
4 files changed, 86 insertions(+), 16 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index ec59233..a350096 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -115,6 +115,18 @@ public class Bucket<IN, BucketID> {
this.pending = new ArrayList<>();
}
+ /**
+ * Gets the information available for the currently
+ * open part file, i.e. the one we are currently writing to.
+ *
+ * <p>This will be null if there is no currently open part file. This
+ * is the case when we have a new, just created bucket or a bucket
+ * that has not received any data after the closing of its previously
+ * open in-progress file due to the specified rolling policy.
+ *
+ * @return The information about the currently in-progress part file
+ * or {@code null} if there is no open part file.
+ */
public PartFileInfo<BucketID> getInProgressPartInfo() {
return currentPart;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 7e9dd61..e62c425 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -70,8 +70,6 @@ public class Buckets<IN, BucketID> {
private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
- private long initMaxPartCounter;
-
private long maxPartCounterUsed;
private final RecoverableWriter fileSystemWriter;
@@ -114,7 +112,6 @@ public class Buckets<IN, BucketID> {
bucketer.getSerializer()
);
- this.initMaxPartCounter = 0L;
this.maxPartCounterUsed = 0L;
}
@@ -137,7 +134,7 @@ public class Buckets<IN, BucketID> {
for (long partCounter: partCounterState.get()) {
maxCounter = Math.max(partCounter, maxCounter);
}
- initMaxPartCounter = maxCounter;
+ maxPartCounterUsed = maxCounter;
// get the restored buckets
for (byte[] recoveredState : bucketStates.get()) {
@@ -151,7 +148,7 @@ public class Buckets<IN, BucketID> {
final Bucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
fileSystemWriter,
subtaskIndex,
- initMaxPartCounter,
+ maxPartCounterUsed,
partFileWriterFactory,
bucketState
);
@@ -200,8 +197,6 @@ public class Buckets<IN, BucketID> {
final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) {
- // we also check here so that we do not have to always
- // wait for the "next" element to arrive.
bucket.closePartFile();
}
@@ -237,13 +232,19 @@ public class Buckets<IN, BucketID> {
subtaskIndex,
bucketId,
bucketPath,
- initMaxPartCounter,
+ maxPartCounterUsed,
partFileWriterFactory);
activeBuckets.put(bucketId, bucket);
}
final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) {
+
+ // info will be null if there is no currently open part file. This
+ // is the case when we have a new, just created bucket or a bucket
+ // that has not received any data after the closing of its previously
+ // open in-progress file due to the specified rolling policy.
+
bucket.rollPartFile(currentProcessingTime);
}
bucket.write(value, currentProcessingTime);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 4b1e743..a0c438e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -323,7 +323,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals("test1@1\n", fileContents.getValue());
} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
bucketCounter++;
- Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+ Assert.assertEquals("part-0-1", fileContents.getKey().getName());
Assert.assertEquals("test2@1\n", fileContents.getValue());
} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
bucketCounter++;
@@ -346,11 +346,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals("test2@1\n", fileContents.getValue());
} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
bucketCounter++;
- Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+ Assert.assertEquals("part-0-2", fileContents.getKey().getName());
Assert.assertEquals("test3@1\n", fileContents.getValue());
} else if (fileContents.getKey().getParentFile().getName().equals("test4")) {
bucketCounter++;
- Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+ Assert.assertEquals("part-0-3", fileContents.getKey().getName());
Assert.assertEquals("test4@1\n", fileContents.getValue());
}
}
@@ -437,8 +437,8 @@ public class LocalStreamingFileSinkTest extends TestLogger {
inProgressFilename.contains(".part-1-0.inprogress")
)
) {
- counter++;
- } else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-0.inprogress")) {
+ counter++;
+ } else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-1.inprogress")) {
counter++;
}
}
@@ -476,7 +476,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
counter++;
Assert.assertTrue(fileContents.getValue().equals("test1@1\n") || fileContents.getValue().equals("test1@0\n"));
}
- } else if (parentFilename.equals("test2") && filename.contains(".part-1-0.inprogress")) {
+ } else if (parentFilename.equals("test2") && filename.contains(".part-1-1.inprogress")) {
counter++;
Assert.assertEquals("test2@1\n", fileContents.getValue());
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index db54de9..f16a908 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -25,12 +25,19 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.util.Objects;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
/**
* Tests for different {@link RollingPolicy rolling policies}.
@@ -134,24 +141,74 @@ public class RollingPolicyTest {
// we take a checkpoint so we roll.
testHarness.snapshot(1L, 1L);
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
+ } else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
+ }
+ }
+
// this will create a new part file
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
TestUtils.checkLocalFs(outDir, 3, 0);
+ testHarness.notifyOfCompletedCheckpoint(1L);
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertTrue(
+ file.getName().contains(".part-0-2.inprogress.") || file.getName().equals("part-0-1")
+ );
+ } else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertEquals("part-0-0", file.getName());
+ }
+ }
+
// and open and fill .part-0-2.inprogress
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
- TestUtils.checkLocalFs(outDir, 3, 0); // nothing committed yet
+ TestUtils.checkLocalFs(outDir, 1, 2);
// we take a checkpoint so we roll.
testHarness.snapshot(2L, 2L);
testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 7), 7L));
- TestUtils.checkLocalFs(outDir, 4, 0);
+ TestUtils.checkLocalFs(outDir, 2, 2);
+
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertThat(
+ file.getName(),
+ either(containsString(".part-0-2.inprogress."))
+ .or(equalTo("part-0-1"))
+ );
+ } else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertThat(
+ file.getName(),
+ either(containsString(".part-0-3.inprogress."))
+ .or(equalTo("part-0-0"))
+ );
+ }
+ }
// we acknowledge the last checkpoint so we should publish all but the latest in-progress file
testHarness.notifyOfCompletedCheckpoint(2L);
+
TestUtils.checkLocalFs(outDir, 1, 3);
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertThat(
+ file.getName(),
+ either(equalTo("part-0-2")).or(equalTo("part-0-1"))
+ );
+ } else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertThat(
+ file.getName(),
+ either(containsString(".part-0-3.inprogress."))
+ .or(equalTo("part-0-0"))
+ );
+ }
+ }
}
}