You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/23 14:46:32 UTC

[1/2] flink git commit: [FLINK-8600] Allow disabling truncate() check in BucketingSink

Repository: flink
Updated Branches:
  refs/heads/release-1.4 ba8d72ab5 -> 896797be7


[FLINK-8600] Allow disabling truncate() check in BucketingSink

The test was failing when using PrestoS3FileSystem because it doesn't
use an absolute/qualified path.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4a88614
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4a88614
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4a88614

Branch: refs/heads/release-1.4
Commit: b4a886149cc5432fdb58c8785047399728846527
Parents: ba8d72a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Feb 14 14:48:22 2018 +0100
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Apr 23 14:02:54 2018 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 27 ++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4a88614/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 6293fe0..6e7f460 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -265,6 +265,8 @@ public class BucketingSink<T>
 
 	private String partPrefix = DEFAULT_PART_REFIX;
 
+	private boolean useTruncate = true;
+
 	/**
 	 * The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
 	 */
@@ -572,6 +574,12 @@ public class BucketingSink<T>
 	 * <p><b>NOTE:</b> This code comes from Flume.
 	 */
 	private Method reflectTruncate(FileSystem fs) {
+		// completely disable the check for truncate() because the check can be problematic
+		// on some filesystem implementations
+		if (!useTruncate) {
+			return null;
+		}
+
 		Method m = null;
 		if (fs != null) {
 			Class<?> fsClass = fs.getClass();
@@ -592,7 +600,9 @@ public class BucketingSink<T>
 				outputStream.close();
 			} catch (IOException e) {
 				LOG.error("Could not create file for checking if truncate works.", e);
-				throw new RuntimeException("Could not create file for checking if truncate works.", e);
+				throw new RuntimeException("Could not create file for checking if truncate works. " +
+					"You can disable support for truncate() completely via " +
+					"BucketingSink.setUseTruncate(false).", e);
 			}
 
 			try {
@@ -606,7 +616,9 @@ public class BucketingSink<T>
 				fs.delete(testPath, false);
 			} catch (IOException e) {
 				LOG.error("Could not delete truncate test file.", e);
-				throw new RuntimeException("Could not delete truncate test file.", e);
+				throw new RuntimeException("Could not delete truncate test file. " +
+					"You can disable support for truncate() completely via " +
+					"BucketingSink.setUseTruncate(false).", e);
 			}
 		}
 		return m;
@@ -983,6 +995,17 @@ public class BucketingSink<T>
 	}
 
 	/**
+	 * Sets whether to use {@code FileSystem.truncate()} to truncate written bucket files back to
+	 * a consistent state in case of a restore from checkpoint. If {@code truncate()} is not used
+	 * this sink will write valid-length files for corresponding bucket files that have to be used
+	 * when reading from bucket files to make sure to not read too far.
+	 */
+	public BucketingSink<T> setUseTruncate(boolean useTruncate) {
+		this.useTruncate = useTruncate;
+		return this;
+	}
+
+	/**
 	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
 	 *
 	 *


[2/2] flink git commit: [FLINK-9113] [connectors] Use raw local file system for bucketing sink to prevent data loss

Posted by tw...@apache.org.
[FLINK-9113] [connectors] Use raw local file system for bucketing sink to prevent data loss

This change replaces Hadoop's LocalFileSystem (which is a checksumming filesystem) with the RawFileSystem implementation. For performing checksums the default filesystem only flushes in 512 byte intervals which might lead to data loss during checkpointing. In order to guarantee exact results we skip the checksum computation and perform a raw flush.

Negative effect: Existing checksums are not maintained anymore and thus become invalid.

This closes #5861.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/896797be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/896797be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/896797be

Branch: refs/heads/release-1.4
Commit: 896797be71e920110aa270402d031969126221bb
Parents: b4a8861
Author: Timo Walther <tw...@apache.org>
Authored: Tue Apr 17 15:12:55 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Apr 23 14:06:10 2018 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |   2 +-
 .../connectors/fs/bucketing/BucketingSink.java  |   9 +-
 .../fs/RollingSinkFaultToleranceITCase.java     |   5 +-
 .../connectors/fs/RollingSinkITCase.java        |  73 ++++---------
 .../BucketingSinkFaultToleranceITCase.java      |   5 +-
 .../bucketing/BucketingSinkMigrationTest.java   |  49 ++-------
 .../fs/bucketing/BucketingSinkTest.java         | 109 +++++++------------
 .../fs/bucketing/BucketingSinkTestUtils.java    |  83 ++++++++++++++
 8 files changed, 170 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 4cd38a1..709a7c9 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -699,9 +699,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
 					}
 
 				} else {
-					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
 					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
+						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
 						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
 						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
 						lengthFileOut.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 6e7f460..01f84ef 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -44,6 +44,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
@@ -833,9 +834,9 @@ public class BucketingSink<T>
 						}
 					}
 				} else {
-					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
 					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
+						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
 						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
 						lengthFileOut.writeUTF(Long.toString(validLength));
 						lengthFileOut.close();
@@ -1227,6 +1228,12 @@ public class BucketingSink<T>
 			}
 
 			fs.initialize(fsUri, finalConf);
+
+			// We don't perform checksums on Hadoop's local filesystem and use the raw filesystem.
+			// Otherwise buffers are not flushed entirely during checkpointing which results in data loss.
+			if (fs instanceof LocalFileSystem) {
+				return ((LocalFileSystem) fs).getRaw();
+			}
 			return fs;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 7473685..886055d 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -51,6 +51,8 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.IN_PROGRESS_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -75,9 +77,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 
 	private static String outPath;
 
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-
 	@BeforeClass
 	public static void createHDFS() throws IOException {
 		Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 10d1846..81d378c 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -44,7 +44,6 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.StringType;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +70,8 @@ import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs;
+
 /**
  * Tests for {@link RollingSink}. These
  * tests test the different output methods as well as the rolling feature using a manual clock
@@ -648,32 +649,32 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		testHarness.processElement(new StreamRecord<>("test1", 1L));
 		testHarness.processElement(new StreamRecord<>("test2", 1L));
-		checkFs(outDir, 1, 1 , 0, 0);
+		checkLocalFs(outDir, 1, 1 , 0, 0);
 
 		testHarness.processElement(new StreamRecord<>("test3", 1L));
-		checkFs(outDir, 1, 2, 0, 0);
+		checkLocalFs(outDir, 1, 2, 0, 0);
 
 		testHarness.snapshot(0, 0);
-		checkFs(outDir, 1, 2, 0, 0);
+		checkLocalFs(outDir, 1, 2, 0, 0);
 
 		testHarness.notifyOfCompletedCheckpoint(0);
-		checkFs(outDir, 1, 0, 2, 0);
+		checkLocalFs(outDir, 1, 0, 2, 0);
 
 		OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
 
 		testHarness.close();
-		checkFs(outDir, 0, 1, 2, 0);
+		checkLocalFs(outDir, 0, 1, 2, 0);
 
 		testHarness = createRescalingTestSink(outDir, 1, 0);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
-		checkFs(outDir, 0, 0, 3, 1);
+		checkLocalFs(outDir, 0, 0, 3, 1);
 
 		snapshot = testHarness.snapshot(2, 0);
 
 		testHarness.processElement(new StreamRecord<>("test4", 10));
-		checkFs(outDir, 1, 0, 3, 1);
+		checkLocalFs(outDir, 1, 0, 3, 1);
 
 		testHarness = createRescalingTestSink(outDir, 1, 0);
 		testHarness.setup();
@@ -681,13 +682,13 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		testHarness.open();
 
 		// the in-progress file remains as we do not clean up now
-		checkFs(outDir, 1, 0, 3, 1);
+		checkLocalFs(outDir, 1, 0, 3, 1);
 
 		testHarness.close();
 
 		// at close it is not moved to final because it is not part
 		// of the current task's state, it was just a not cleaned up leftover.
-		checkFs(outDir, 1, 0, 3, 1);
+		checkLocalFs(outDir, 1, 0, 3, 1);
 	}
 
 	@Test
@@ -707,18 +708,18 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		testHarness3.open();
 
 		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		checkFs(outDir, 1, 0, 0, 0);
+		checkLocalFs(outDir, 1, 0, 0, 0);
 
 		testHarness2.processElement(new StreamRecord<>("test2", 0L));
 		testHarness2.processElement(new StreamRecord<>("test3", 0L));
 		testHarness2.processElement(new StreamRecord<>("test4", 0L));
 		testHarness2.processElement(new StreamRecord<>("test5", 0L));
 		testHarness2.processElement(new StreamRecord<>("test6", 0L));
-		checkFs(outDir, 2, 4, 0, 0);
+		checkLocalFs(outDir, 2, 4, 0, 0);
 
 		testHarness3.processElement(new StreamRecord<>("test7", 0L));
 		testHarness3.processElement(new StreamRecord<>("test8", 0L));
-		checkFs(outDir, 3, 5, 0, 0);
+		checkLocalFs(outDir, 3, 5, 0, 0);
 
 		// intentionally we snapshot them in a not ascending order so that the states are shuffled
 		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -738,14 +739,14 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		// we do not have a length file for part-2-0 because bucket part-2-0
 		// was not "in-progress", but "pending" (its full content is valid).
-		checkFs(outDir, 1, 4, 3, 2);
+		checkLocalFs(outDir, 1, 4, 3, 2);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1);
 		testHarness5.setup();
 		testHarness5.initializeState(mergedSnapshot);
 		testHarness5.open();
 
-		checkFs(outDir, 0, 0, 8, 3);
+		checkLocalFs(outDir, 0, 0, 8, 3);
 	}
 
 	@Test
@@ -763,13 +764,13 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		testHarness1.processElement(new StreamRecord<>("test1", 0L));
 		testHarness1.processElement(new StreamRecord<>("test2", 0L));
 
-		checkFs(outDir, 1, 1, 0, 0);
+		checkLocalFs(outDir, 1, 1, 0, 0);
 
 		testHarness2.processElement(new StreamRecord<>("test3", 0L));
 		testHarness2.processElement(new StreamRecord<>("test4", 0L));
 		testHarness2.processElement(new StreamRecord<>("test5", 0L));
 
-		checkFs(outDir, 2, 3, 0, 0);
+		checkLocalFs(outDir, 2, 3, 0, 0);
 
 		// intentionally we snapshot them in the reverse order so that the states are shuffled
 		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -782,28 +783,28 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		testHarness1.initializeState(mergedSnapshot);
 		testHarness1.open();
 
-		checkFs(outDir, 1, 1, 3, 1);
+		checkLocalFs(outDir, 1, 1, 3, 1);
 
 		testHarness2 = createRescalingTestSink(outDir, 3, 1);
 		testHarness2.setup();
 		testHarness2.initializeState(mergedSnapshot);
 		testHarness2.open();
 
-		checkFs(outDir, 0, 0, 5, 2);
+		checkLocalFs(outDir, 0, 0, 5, 2);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2);
 		testHarness3.setup();
 		testHarness3.initializeState(mergedSnapshot);
 		testHarness3.open();
 
-		checkFs(outDir, 0, 0, 5, 2);
+		checkLocalFs(outDir, 0, 0, 5, 2);
 
 		testHarness1.processElement(new StreamRecord<>("test6", 0));
 		testHarness2.processElement(new StreamRecord<>("test6", 0));
 		testHarness3.processElement(new StreamRecord<>("test6", 0));
 
 		// 3 for the different tasks
-		checkFs(outDir, 3, 0, 5, 2);
+		checkLocalFs(outDir, 3, 0, 5, 2);
 
 		testHarness1.snapshot(1, 0);
 		testHarness2.snapshot(1, 0);
@@ -813,7 +814,7 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		testHarness2.close();
 		testHarness3.close();
 
-		checkFs(outDir, 0, 3, 5, 2);
+		checkLocalFs(outDir, 0, 3, 5, 2);
 	}
 
 	private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink(
@@ -838,34 +839,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
 	}
 
-	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
-		int inProg = 0;
-		int pend = 0;
-		int compl = 0;
-		int val = 0;
-
-		for (File file: FileUtils.listFiles(outDir, null, true)) {
-			if (file.getAbsolutePath().endsWith("crc")) {
-				continue;
-			}
-			String path = file.getPath();
-			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-				inProg++;
-			} else if (path.endsWith(PENDING_SUFFIX)) {
-				pend++;
-			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-				val++;
-			} else if (path.contains(PART_PREFIX)) {
-				compl++;
-			}
-		}
-
-		Assert.assertEquals(inprogress, inProg);
-		Assert.assertEquals(pending, pend);
-		Assert.assertEquals(completed, compl);
-		Assert.assertEquals(valid, val);
-	}
-
 	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index 5e417d0..0122c18 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -51,6 +51,8 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.IN_PROGRESS_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -72,9 +74,6 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
 
 	private static String outPath;
 
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-
 	@BeforeClass
 	public static void createHDFS() throws IOException {
 		Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 3b7c7d4..0450438 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -45,12 +44,16 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.IN_PROGRESS_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PART_PREFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.VALID_LENGTH_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -72,11 +75,6 @@ public class BucketingSinkMigrationTest {
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-	private static final String PART_PREFIX = "part";
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-	private static final String VALID_LENGTH_SUFFIX = ".valid";
-
 	@BeforeClass
 	public static void verifyOS() {
 		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
@@ -127,13 +125,13 @@ public class BucketingSinkMigrationTest {
 		testHarness.processElement(new StreamRecord<>("test1", 0L));
 		testHarness.processElement(new StreamRecord<>("test2", 0L));
 
-		checkFs(outDir, 1, 1, 0, 0);
+		checkLocalFs(outDir, 1, 1, 0, 0);
 
 		testHarness.processElement(new StreamRecord<>("test3", 0L));
 		testHarness.processElement(new StreamRecord<>("test4", 0L));
 		testHarness.processElement(new StreamRecord<>("test5", 0L));
 
-		checkFs(outDir, 1, 4, 0, 0);
+		checkLocalFs(outDir, 1, 4, 0, 0);
 
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 
@@ -155,7 +153,8 @@ public class BucketingSinkMigrationTest {
 			.setValidLengthPrefix("")
 			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
 			.setPendingSuffix(PENDING_SUFFIX)
-			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
+			.setUseTruncate(false); // don't use truncate because files do not exist
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
 			new StreamSink<>(sink), 10, 1, 0);
@@ -174,39 +173,11 @@ public class BucketingSinkMigrationTest {
 		testHarness.processElement(new StreamRecord<>("test1", 0L));
 		testHarness.processElement(new StreamRecord<>("test2", 0L));
 
-		checkFs(outDir, 1, 1, 0, 0);
+		checkLocalFs(outDir, 1, 1, 0, 0);
 
 		testHarness.close();
 	}
 
-	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
-		int inProg = 0;
-		int pend = 0;
-		int compl = 0;
-		int val = 0;
-
-		for (File file: FileUtils.listFiles(outDir, null, true)) {
-			if (file.getAbsolutePath().endsWith("crc")) {
-				continue;
-			}
-			String path = file.getPath();
-			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-				inProg++;
-			} else if (path.endsWith(PENDING_SUFFIX)) {
-				pend++;
-			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-				val++;
-			} else if (path.contains(PART_PREFIX)) {
-				compl++;
-			}
-		}
-
-		Assert.assertEquals(inprogress, inProg);
-		Assert.assertEquals(pending, pend);
-		Assert.assertEquals(completed, compl);
-		Assert.assertEquals(valid, val);
-	}
-
 	static class ValidatingBucketingSink<T> extends BucketingSink<T> {
 
 		private static final long serialVersionUID = -4263974081712009141L;

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index d6852ef..3c03d34 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -67,6 +67,12 @@ import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.IN_PROGRESS_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PART_PREFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.VALID_LENGTH_SUFFIX;
+import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs;
+
 /**
  * Tests for the {@link BucketingSink}.
  */
@@ -78,11 +84,6 @@ public class BucketingSinkTest {
 	private static org.apache.hadoop.fs.FileSystem dfs;
 	private static String hdfsURI;
 
-	private static final String PART_PREFIX = "part";
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-	private static final String VALID_LENGTH_SUFFIX = ".valid";
-
 	private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink(
 		File outDir, int totalParallelism, int taskIdx, long inactivityInterval) throws Exception {
 
@@ -186,13 +187,13 @@ public class BucketingSinkTest {
 
 		testHarness.processElement(new StreamRecord<>("test1", 1L));
 		testHarness.processElement(new StreamRecord<>("test2", 1L));
-		checkFs(outDir, 2, 0 , 0, 0);
+		checkLocalFs(outDir, 2, 0 , 0, 0);
 
 		testHarness.setProcessingTime(101L);	// put some in pending
-		checkFs(outDir, 0, 2, 0, 0);
+		checkLocalFs(outDir, 0, 2, 0, 0);
 
 		testHarness.snapshot(0, 0);				// put them in pending for 0
-		checkFs(outDir, 0, 2, 0, 0);
+		checkLocalFs(outDir, 0, 2, 0, 0);
 
 		testHarness.processElement(new StreamRecord<>("test3", 1L));
 		testHarness.processElement(new StreamRecord<>("test4", 1L));
@@ -200,13 +201,13 @@ public class BucketingSinkTest {
 		testHarness.setProcessingTime(202L);	// put some in pending
 
 		testHarness.snapshot(1, 0);				// put them in pending for 1
-		checkFs(outDir, 0, 4, 0, 0);
+		checkLocalFs(outDir, 0, 4, 0, 0);
 
 		testHarness.notifyOfCompletedCheckpoint(0);	// put the pending for 0 to the "committed" state
-		checkFs(outDir, 0, 2, 2, 0);
+		checkLocalFs(outDir, 0, 2, 2, 0);
 
 		testHarness.notifyOfCompletedCheckpoint(1); // put the pending for 1 to the "committed" state
-		checkFs(outDir, 0, 0, 4, 0);
+		checkLocalFs(outDir, 0, 0, 4, 0);
 	}
 
 	@Test
@@ -221,36 +222,36 @@ public class BucketingSinkTest {
 
 		testHarness.processElement(new StreamRecord<>("test1", 1L));
 		testHarness.processElement(new StreamRecord<>("test2", 1L));
-		checkFs(outDir, 2, 0 , 0, 0);
+		checkLocalFs(outDir, 2, 0 , 0, 0);
 
 		// this is to check the inactivity threshold
 		testHarness.setProcessingTime(101L);
-		checkFs(outDir, 0, 2, 0, 0);
+		checkLocalFs(outDir, 0, 2, 0, 0);
 
 		testHarness.processElement(new StreamRecord<>("test3", 1L));
-		checkFs(outDir, 1, 2, 0, 0);
+		checkLocalFs(outDir, 1, 2, 0, 0);
 
 		testHarness.snapshot(0, 0);
-		checkFs(outDir, 1, 2, 0, 0);
+		checkLocalFs(outDir, 1, 2, 0, 0);
 
 		testHarness.notifyOfCompletedCheckpoint(0);
-		checkFs(outDir, 1, 0, 2, 0);
+		checkLocalFs(outDir, 1, 0, 2, 0);
 
 		OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
 
 		testHarness.close();
-		checkFs(outDir, 0, 1, 2, 0);
+		checkLocalFs(outDir, 0, 1, 2, 0);
 
 		testHarness = createRescalingTestSink(outDir, 1, 0, 100);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
-		checkFs(outDir, 0, 0, 3, 1);
+		checkLocalFs(outDir, 0, 0, 3, 1);
 
 		snapshot = testHarness.snapshot(2, 0);
 
 		testHarness.processElement(new StreamRecord<>("test4", 10));
-		checkFs(outDir, 1, 0, 3, 1);
+		checkLocalFs(outDir, 1, 0, 3, 1);
 
 		testHarness = createRescalingTestSink(outDir, 1, 0, 100);
 		testHarness.setup();
@@ -258,13 +259,13 @@ public class BucketingSinkTest {
 		testHarness.open();
 
 		// the in-progress file remains as we do not clean up now
-		checkFs(outDir, 1, 0, 3, 1);
+		checkLocalFs(outDir, 1, 0, 3, 1);
 
 		testHarness.close();
 
 		// at close it is not moved to final because it is not part
 		// of the current task's state, it was just a not cleaned up leftover.
-		checkFs(outDir, 1, 0, 3, 1);
+		checkLocalFs(outDir, 1, 0, 3, 1);
 	}
 
 	@Test
@@ -280,10 +281,10 @@ public class BucketingSinkTest {
 		testHarness2.open();
 
 		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		checkFs(outDir, 1, 0, 0, 0);
+		checkLocalFs(outDir, 1, 0, 0, 0);
 
 		testHarness2.processElement(new StreamRecord<>("test2", 0L));
-		checkFs(outDir, 2, 0, 0, 0);
+		checkLocalFs(outDir, 2, 0, 0, 0);
 
 		// intentionally we snapshot them in the reverse order so that the states are shuffled
 		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -291,11 +292,11 @@ public class BucketingSinkTest {
 			testHarness1.snapshot(0, 0)
 		);
 
-		checkFs(outDir, 2, 0, 0, 0);
+		checkLocalFs(outDir, 2, 0, 0, 0);
 
 		// this will not be included in any checkpoint so it can be cleaned up (although we do not)
 		testHarness2.processElement(new StreamRecord<>("test3", 0L));
-		checkFs(outDir, 3, 0, 0, 0);
+		checkLocalFs(outDir, 3, 0, 0, 0);
 
 		testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
 		testHarness1.setup();
@@ -304,20 +305,20 @@ public class BucketingSinkTest {
 
 		// the one in-progress will be the one assigned to the next instance,
 		// the other is the test3 which is just not cleaned up
-		checkFs(outDir, 2, 0, 1, 1);
+		checkLocalFs(outDir, 2, 0, 1, 1);
 
 		testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
 		testHarness2.setup();
 		testHarness2.initializeState(mergedSnapshot);
 		testHarness2.open();
 
-		checkFs(outDir, 1, 0, 2, 2);
+		checkLocalFs(outDir, 1, 0, 2, 2);
 
 		testHarness1.close();
 		testHarness2.close();
 
 		// the 1 in-progress can be discarded.
-		checkFs(outDir, 1, 0, 2, 2);
+		checkLocalFs(outDir, 1, 0, 2, 2);
 	}
 
 	@Test
@@ -337,14 +338,14 @@ public class BucketingSinkTest {
 		testHarness3.open();
 
 		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		checkFs(outDir, 1, 0, 0, 0);
+		checkLocalFs(outDir, 1, 0, 0, 0);
 
 		testHarness2.processElement(new StreamRecord<>("test2", 0L));
-		checkFs(outDir, 2, 0, 0, 0);
+		checkLocalFs(outDir, 2, 0, 0, 0);
 
 		testHarness3.processElement(new StreamRecord<>("test3", 0L));
 		testHarness3.processElement(new StreamRecord<>("test4", 0L));
-		checkFs(outDir, 4, 0, 0, 0);
+		checkLocalFs(outDir, 4, 0, 0, 0);
 
 		// intentionally we snapshot them in the reverse order so that the states are shuffled
 		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -358,14 +359,14 @@ public class BucketingSinkTest {
 		testHarness1.initializeState(mergedSnapshot);
 		testHarness1.open();
 
-		checkFs(outDir, 1, 0, 3, 3);
+		checkLocalFs(outDir, 1, 0, 3, 3);
 
 		testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
 		testHarness2.setup();
 		testHarness2.initializeState(mergedSnapshot);
 		testHarness2.open();
 
-		checkFs(outDir, 0, 0, 4, 4);
+		checkLocalFs(outDir, 0, 0, 4, 4);
 	}
 
 	@Test
@@ -383,13 +384,13 @@ public class BucketingSinkTest {
 		testHarness1.processElement(new StreamRecord<>("test1", 1L));
 		testHarness1.processElement(new StreamRecord<>("test2", 1L));
 
-		checkFs(outDir, 2, 0, 0, 0);
+		checkLocalFs(outDir, 2, 0, 0, 0);
 
 		testHarness2.processElement(new StreamRecord<>("test3", 1L));
 		testHarness2.processElement(new StreamRecord<>("test4", 1L));
 		testHarness2.processElement(new StreamRecord<>("test5", 1L));
 
-		checkFs(outDir, 5, 0, 0, 0);
+		checkLocalFs(outDir, 5, 0, 0, 0);
 
 		// intentionally we snapshot them in the reverse order so that the states are shuffled
 		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -402,27 +403,27 @@ public class BucketingSinkTest {
 		testHarness1.initializeState(mergedSnapshot);
 		testHarness1.open();
 
-		checkFs(outDir, 2, 0, 3, 3);
+		checkLocalFs(outDir, 2, 0, 3, 3);
 
 		testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
 		testHarness2.setup();
 		testHarness2.initializeState(mergedSnapshot);
 		testHarness2.open();
 
-		checkFs(outDir, 0, 0, 5, 5);
+		checkLocalFs(outDir, 0, 0, 5, 5);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2, 100);
 		testHarness3.setup();
 		testHarness3.initializeState(mergedSnapshot);
 		testHarness3.open();
 
-		checkFs(outDir, 0, 0, 5, 5);
+		checkLocalFs(outDir, 0, 0, 5, 5);
 
 		testHarness1.processElement(new StreamRecord<>("test6", 0));
 		testHarness2.processElement(new StreamRecord<>("test6", 0));
 		testHarness3.processElement(new StreamRecord<>("test6", 0));
 
-		checkFs(outDir, 3, 0, 5, 5);
+		checkLocalFs(outDir, 3, 0, 5, 5);
 
 		testHarness1.snapshot(1, 0);
 		testHarness2.snapshot(1, 0);
@@ -432,35 +433,7 @@ public class BucketingSinkTest {
 		testHarness2.close();
 		testHarness3.close();
 
-		checkFs(outDir, 0, 3, 5, 5);
-	}
-
-	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
-		int inProg = 0;
-		int pend = 0;
-		int compl = 0;
-		int val = 0;
-
-		for (File file: FileUtils.listFiles(outDir, null, true)) {
-			if (file.getAbsolutePath().endsWith("crc")) {
-				continue;
-			}
-			String path = file.getPath();
-			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-				inProg++;
-			} else if (path.endsWith(PENDING_SUFFIX)) {
-				pend++;
-			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-				val++;
-			} else if (path.contains(PART_PREFIX)) {
-				compl++;
-			}
-		}
-
-		Assert.assertEquals(inprogress, inProg);
-		Assert.assertEquals(pending, pend);
-		Assert.assertEquals(completed, compl);
-		Assert.assertEquals(valid, val);
+		checkLocalFs(outDir, 0, 3, 5, 5);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/896797be/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTestUtils.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTestUtils.java
new file mode 100644
index 0000000..de557b7
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTestUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Assert;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Test utilities for bucketing sinks.
+ */
+public class BucketingSinkTestUtils {
+
+	public static final String PART_PREFIX = "part";
+	public static final String PENDING_SUFFIX = ".pending";
+	public static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	public static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	/**
+	 * Verifies the correct number of written files and reasonable length files.
+	 */
+	public static void checkLocalFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				// check that content of length file is valid
+				try (DataInputStream dis = new DataInputStream(new FileInputStream(file))) {
+					final long validLength = Long.valueOf(dis.readUTF());
+					final String truncated = path.substring(0, path.length() - VALID_LENGTH_SUFFIX.length());
+					Assert.assertTrue("Mismatch between valid length and file size.",
+						FileUtils.sizeOf(new File(truncated)) >= validLength);
+				}
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		// check length file in case truncating is not supported
+		try {
+			RawLocalFileSystem.class.getMethod("truncate", Path.class, long.class);
+		} catch (NoSuchMethodException e) {
+			Assert.assertEquals(valid, val);
+		}
+	}
+}