You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/08/01 08:16:22 UTC

[flink] branch master updated: [FLINK-10005][DataStream API] StreamingFileSink: initPartCounter=maxUsed in new Buckets

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 296f6a8  [FLINK-10005][DataStream API] StreamingFileSink: initPartCounter=maxUsed in new Buckets
296f6a8 is described below

commit 296f6a8264d6bf15c08bff5766bdda939600fbe7
Author: kkloudas <kk...@gmail.com>
AuthorDate: Wed Aug 1 07:59:42 2018 +0200

    [FLINK-10005][DataStream API] StreamingFileSink: initPartCounter=maxUsed in new Buckets
    
    This closes #6466.
---
 .../api/functions/sink/filesystem/Bucket.java      | 12 +++++
 .../api/functions/sink/filesystem/Buckets.java     | 17 +++---
 .../filesystem/LocalStreamingFileSinkTest.java     | 12 ++---
 .../sink/filesystem/RollingPolicyTest.java         | 61 +++++++++++++++++++++-
 4 files changed, 86 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index ec59233..a350096 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -115,6 +115,18 @@ public class Bucket<IN, BucketID> {
 		this.pending = new ArrayList<>();
 	}
 
+	/**
+	 * Gets the information available for the currently
+	 * open part file, i.e. the one we are currently writing to.
+	 *
+	 * <p>This will be null if there is no currently open part file. This
+	 * is the case when we have a new, just created bucket or a bucket
+	 * that has not received any data after the closing of its previously
+	 * open in-progress file due to the specified rolling policy.
+	 *
+	 * @return The information about the currently in-progress part file
+	 * or {@code null} if there is no open part file.
+	 */
 	public PartFileInfo<BucketID> getInProgressPartInfo() {
 		return currentPart;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 7e9dd61..e62c425 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -70,8 +70,6 @@ public class Buckets<IN, BucketID> {
 
 	private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
 
-	private long initMaxPartCounter;
-
 	private long maxPartCounterUsed;
 
 	private final RecoverableWriter fileSystemWriter;
@@ -114,7 +112,6 @@ public class Buckets<IN, BucketID> {
 				bucketer.getSerializer()
 		);
 
-		this.initMaxPartCounter = 0L;
 		this.maxPartCounterUsed = 0L;
 	}
 
@@ -137,7 +134,7 @@ public class Buckets<IN, BucketID> {
 		for (long partCounter: partCounterState.get()) {
 			maxCounter = Math.max(partCounter, maxCounter);
 		}
-		initMaxPartCounter = maxCounter;
+		maxPartCounterUsed = maxCounter;
 
 		// get the restored buckets
 		for (byte[] recoveredState : bucketStates.get()) {
@@ -151,7 +148,7 @@ public class Buckets<IN, BucketID> {
 			final Bucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
 					fileSystemWriter,
 					subtaskIndex,
-					initMaxPartCounter,
+					maxPartCounterUsed,
 					partFileWriterFactory,
 					bucketState
 			);
@@ -200,8 +197,6 @@ public class Buckets<IN, BucketID> {
 			final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
 
 			if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) {
-				// we also check here so that we do not have to always
-				// wait for the "next" element to arrive.
 				bucket.closePartFile();
 			}
 
@@ -237,13 +232,19 @@ public class Buckets<IN, BucketID> {
 					subtaskIndex,
 					bucketId,
 					bucketPath,
-					initMaxPartCounter,
+					maxPartCounterUsed,
 					partFileWriterFactory);
 			activeBuckets.put(bucketId, bucket);
 		}
 
 		final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
 		if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) {
+
+			// info will be null if there is no currently open part file. This
+			// is the case when we have a new, just created bucket or a bucket
+			// that has not received any data after the closing of its previously
+			// open in-progress file due to the specified rolling policy.
+
 			bucket.rollPartFile(currentProcessingTime);
 		}
 		bucket.write(value, currentProcessingTime);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 4b1e743..a0c438e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -323,7 +323,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 					Assert.assertEquals("test1@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
 					bucketCounter++;
-					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("part-0-1", fileContents.getKey().getName());
 					Assert.assertEquals("test2@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
 					bucketCounter++;
@@ -346,11 +346,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 					Assert.assertEquals("test2@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
 					bucketCounter++;
-					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("part-0-2", fileContents.getKey().getName());
 					Assert.assertEquals("test3@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test4")) {
 					bucketCounter++;
-					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("part-0-3", fileContents.getKey().getName());
 					Assert.assertEquals("test4@1\n", fileContents.getValue());
 				}
 			}
@@ -437,8 +437,8 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 							inProgressFilename.contains(".part-1-0.inprogress")
 						)
 				) {
-						counter++;
-				} else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-0.inprogress")) {
+					counter++;
+				} else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-1.inprogress")) {
 					counter++;
 				}
 			}
@@ -476,7 +476,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 						counter++;
 						Assert.assertTrue(fileContents.getValue().equals("test1@1\n") || fileContents.getValue().equals("test1@0\n"));
 					}
-				} else if (parentFilename.equals("test2") && filename.contains(".part-1-0.inprogress")) {
+				} else if (parentFilename.equals("test2") && filename.contains(".part-1-1.inprogress")) {
 					counter++;
 					Assert.assertEquals("test2@1\n", fileContents.getValue());
 				}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index db54de9..f16a908 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -25,12 +25,19 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 /**
  * Tests for different {@link RollingPolicy rolling policies}.
@@ -134,24 +141,74 @@ public class RollingPolicyTest {
 			// we take a checkpoint so we roll.
 			testHarness.snapshot(1L, 1L);
 
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
+				}
+			}
+
 			// this will create a new part file
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
 			TestUtils.checkLocalFs(outDir, 3, 0);
 
+			testHarness.notifyOfCompletedCheckpoint(1L);
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertTrue(
+							file.getName().contains(".part-0-2.inprogress.") || file.getName().equals("part-0-1")
+					);
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertEquals("part-0-0", file.getName());
+				}
+			}
+
 			// and open and fill .part-0-2.inprogress
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
-			TestUtils.checkLocalFs(outDir, 3, 0);                    // nothing committed yet
+			TestUtils.checkLocalFs(outDir, 1, 2);
 
 			// we take a checkpoint so we roll.
 			testHarness.snapshot(2L, 2L);
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 7), 7L));
-			TestUtils.checkLocalFs(outDir, 4, 0);
+			TestUtils.checkLocalFs(outDir, 2, 2);
+
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertThat(
+							file.getName(),
+							either(containsString(".part-0-2.inprogress."))
+									.or(equalTo("part-0-1"))
+					);
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertThat(
+							file.getName(),
+							either(containsString(".part-0-3.inprogress."))
+									.or(equalTo("part-0-0"))
+					);
+				}
+			}
 
 			// we acknowledge the last checkpoint so we should publish all but the latest in-progress file
 			testHarness.notifyOfCompletedCheckpoint(2L);
+
 			TestUtils.checkLocalFs(outDir, 1, 3);
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertThat(
+							file.getName(),
+							either(equalTo("part-0-2")).or(equalTo("part-0-1"))
+					);
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertThat(
+							file.getName(),
+							either(containsString(".part-0-3.inprogress."))
+									.or(equalTo("part-0-0"))
+					);
+				}
+			}
 		}
 	}