You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/09 11:02:08 UTC

flink git commit: [FLINK-9603] Fix part counter loop in BucketingSink to account for part suffix.

Repository: flink
Updated Branches:
  refs/heads/master b5b4fb9bc -> b12acea2a


[FLINK-9603] Fix part counter loop in BucketingSink to account for part suffix.

1. all logic, that is responsible for path assembly moved into method;
2. test logic of part file indexing, when in-progress/ pending/ final
   part files already exists in bucket;
3. test the same logic, when part file has suffix

This closes #6176.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b12acea2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b12acea2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b12acea2

Branch: refs/heads/master
Commit: b12acea2afcd75a9183aca549fea426e86bfc5de
Parents: b5b4fb9
Author: Rinat Sharipov <r....@cleverdata.ru>
Authored: Sun Jun 17 15:03:54 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Mon Jul 9 13:01:29 2018 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 13 ++--
 .../fs/bucketing/BucketingSinkTest.java         | 79 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b12acea2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 23e4e0c..34fb1b7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -552,21 +552,17 @@ public class BucketingSink<T>
 		// clean the base directory in case of rescaling.
 
 		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
+		Path partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
 		while (fs.exists(partPath) ||
 				fs.exists(getPendingPathFor(partPath)) ||
 				fs.exists(getInProgressPathFor(partPath))) {
 			bucketState.partCounter++;
-			partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
+			partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
 		}
 
 		// Record the creation time of the bucket
 		bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
 
-		if (partSuffix != null) {
-			partPath = partPath.suffix(partSuffix);
-		}
-
 		// increase, so we don't have to check for this name next time
 		bucketState.partCounter++;
 
@@ -667,6 +663,11 @@ public class BucketingSink<T>
 		return m;
 	}
 
+	private Path assemblePartPath(Path bucket, int subtaskIndex, int partIndex) {
+		String localPartSuffix = partSuffix != null ? partSuffix : "";
+		return new Path(bucket, String.format("%s-%s-%s%s", partPrefix, subtaskIndex, partIndex, localPartSuffix));
+	}
+
 	private Path getPendingPathFor(Path path) {
 		return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12acea2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index dc84846..ee425ff 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -64,6 +64,8 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -72,6 +74,8 @@ import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTe
 import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
 import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.VALID_LENGTH_SUFFIX;
 import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests for the {@link BucketingSink}.
@@ -899,6 +903,81 @@ public class BucketingSinkTest extends TestLogger {
 		inStream.close();
 	}
 
+	@Test
+	public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
+		throws Exception {
+		testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
+	}
+
+	@Test
+	public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInPendingState()
+		throws Exception {
+		testThatPartIndexIsIncremented(".my", "part-0-0.my" + PENDING_SUFFIX);
+	}
+
+	@Test
+	public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInFinalState()
+		throws Exception {
+		testThatPartIndexIsIncremented(".my", "part-0-0.my");
+	}
+
+	@Test
+	public void testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInProgressState()
+		throws Exception {
+		testThatPartIndexIsIncremented(null, "part-0-0" + IN_PROGRESS_SUFFIX);
+	}
+
+	@Test
+	public void testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInPendingState()
+		throws Exception {
+		testThatPartIndexIsIncremented(null, "part-0-0" + PENDING_SUFFIX);
+	}
+
+	@Test
+	public void testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInFinalState()
+		throws Exception {
+		testThatPartIndexIsIncremented(null, "part-0-0");
+	}
+
+	private void testThatPartIndexIsIncremented(String partSuffix, String existingPartFile) throws Exception {
+		File outDir = tempFolder.newFolder();
+		long inactivityInterval = 100;
+
+		java.nio.file.Path bucket = Paths.get(outDir.getPath());
+		Files.createFile(bucket.resolve(existingPartFile));
+
+		String basePath = outDir.getAbsolutePath();
+		BucketingSink<String> sink = new BucketingSink<String>(basePath)
+			.setBucketer(new BasePathBucketer<>())
+			.setInactiveBucketCheckInterval(inactivityInterval)
+			.setInactiveBucketThreshold(inactivityInterval)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
+			.setPartSuffix(partSuffix)
+			.setBatchSize(0);
+
+		try (OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0)) {
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.setProcessingTime(0L);
+
+			testHarness.processElement(new StreamRecord<>("test1", 1L));
+
+			testHarness.setProcessingTime(101L);
+			testHarness.snapshot(0, 0);
+			testHarness.notifyOfCompletedCheckpoint(0);
+		}
+
+		String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + partSuffix;
+		assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
+	}
+
 	private static class StreamWriterWithConfigCheck<K, V> extends AvroKeyValueSinkWriter<K, V> {
 		private Map<String, String> properties;
 		private String key;