You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/13 10:45:51 UTC

[3/4] flink git commit: [FLINK-5443] Migrate from Rolling to Bucketing sink.

[FLINK-5443] Migrate from Rolling to Bucketing sink.

To migrate from a RollingSink to a BucketingSink, a
user can now take a savepoint, change his code to
use a BuckeingSink with the same properties as the
previous RollingSink, and resume his program from
that savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4c60a94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4c60a94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4c60a94

Branch: refs/heads/release-1.2
Commit: b4c60a942fe07e355dd49ed2aab3c0a7ae94285d
Parents: 078e248
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 16:52:51 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:38:44 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  10 +-
 .../connectors/fs/bucketing/BucketingSink.java  | 278 +++++++++++--------
 .../fs/bucketing/RollingSinkMigrationTest.java  |  17 ++
 .../RollingToBucketingMigrationTest.java        | 161 +++++++++++
 4 files changed, 348 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 98eb2d4..429d00a 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -905,30 +905,30 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	 * This is used for keeping track of the current in-progress files and files that we mark
 	 * for moving from pending to final location after we get a checkpoint-complete notification.
 	 */
-	static final class BucketState implements Serializable {
+	public static final class BucketState implements Serializable {
 		private static final long serialVersionUID = 1L;
 
 		/**
 		 * The file that was in-progress when the last checkpoint occurred.
 		 */
-		String currentFile;
+		public String currentFile;
 
 		/**
 		 * The valid length of the in-progress file at the time of the last checkpoint.
 		 */
-		long currentFileValidLength = -1;
+		public long currentFileValidLength = -1;
 
 		/**
 		 * Pending files that accumulated since the last checkpoint.
 		 */
-		List<String> pendingFiles = new ArrayList<>();
+		public List<String> pendingFiles = new ArrayList<>();
 
 		/**
 		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
 		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
 		 * pending files of completed checkpoints to their final location.
 		 */
-		final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
+		public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
 
 		@Override
 		public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index e8bff21..7dbcda7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
@@ -149,7 +151,8 @@ import java.util.Iterator;
  */
 public class BucketingSink<T>
 		extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
+		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
+					CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
 
 	private static final long serialVersionUID = 1L;
 
@@ -344,7 +347,12 @@ public class BucketingSink<T>
 	public void initializeState(FunctionInitializationContext context) throws Exception {
 		Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
 
-		initFileSystem();
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+			throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+		}
 
 		if (this.refTruncate == null) {
 			this.refTruncate = reflectTruncate(fs);
@@ -706,139 +714,183 @@ public class BucketingSink<T>
 			// (we re-start from the last **successful** checkpoint)
 			bucketState.pendingFiles.clear();
 
-			if (bucketState.currentFile != null) {
+			handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
 
-				// We were writing to a file when the last checkpoint occurred. This file can either
-				// be still in-progress or became a pending file at some point after the checkpoint.
-				// Either way, we have to truncate it back to a valid state (or write a .valid-length
-				// file that specifies up to which length it is valid) and rename it to the final name
-				// before starting a new bucket file.
+			// Now that we've restored the bucket to a valid state, reset the current file info
+			bucketState.currentFile = null;
+			bucketState.currentFileValidLength = -1;
+			bucketState.isWriterOpen = false;
 
-				Path partPath = new Path(bucketState.currentFile);
-				try {
-					Path partPendingPath = getPendingPathFor(partPath);
-					Path partInProgressPath = getInProgressPathFor(partPath);
-
-					if (fs.exists(partPendingPath)) {
-						LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
-						// has been moved to pending in the mean time, rename to final location
-						fs.rename(partPendingPath, partPath);
-					} else if (fs.exists(partInProgressPath)) {
-						LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
-						// it was still in progress, rename to final path
-						fs.rename(partInProgressPath, partPath);
-					} else if (fs.exists(partPath)) {
-						LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
-					} else {
-						LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
-							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
-					}
+			handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
 
-					// We use reflection to get the .truncate() method, this
-					// is only available starting with Hadoop 2.7
-					if (this.refTruncate == null) {
-						this.refTruncate = reflectTruncate(fs);
-					}
+			synchronized (bucketState.pendingFilesPerCheckpoint) {
+				bucketState.pendingFilesPerCheckpoint.clear();
+			}
+		}
+	}
+
+	private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
+		restoredState.pendingFiles.clear();
 
-					// truncate it or write a ".valid-length" file to specify up to which point it is valid
-					if (refTruncate != null) {
-						LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
-						// some-one else might still hold the lease from a previous try, we are
-						// recovering, after all ...
-						if (fs instanceof DistributedFileSystem) {
-							DistributedFileSystem dfs = (DistributedFileSystem) fs;
-							LOG.debug("Trying to recover file lease {}", partPath);
-							dfs.recoverLease(partPath);
-							boolean isclosed = dfs.isFileClosed(partPath);
-							StopWatch sw = new StopWatch();
-							sw.start();
-							while (!isclosed) {
-								if (sw.getTime() > asyncTimeout) {
-									break;
-								}
-								try {
-									Thread.sleep(500);
-								} catch (InterruptedException e1) {
-									// ignore it
-								}
-								isclosed = dfs.isFileClosed(partPath);
+		handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
+
+		// Now that we've restored the bucket to a valid state, reset the current file info
+		restoredState.currentFile = null;
+		restoredState.currentFileValidLength = -1;
+
+		handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
+
+		synchronized (restoredState.pendingFilesPerCheckpoint) {
+			restoredState.pendingFilesPerCheckpoint.clear();
+		}
+	}
+
+	private void handlePendingInProgressFile(String file, long validLength) {
+		if (file != null) {
+
+			// We were writing to a file when the last checkpoint occurred. This file can either
+			// be still in-progress or became a pending file at some point after the checkpoint.
+			// Either way, we have to truncate it back to a valid state (or write a .valid-length
+			// file that specifies up to which length it is valid) and rename it to the final name
+			// before starting a new bucket file.
+
+			Path partPath = new Path(file);
+			try {
+				Path partPendingPath = getPendingPathFor(partPath);
+				Path partInProgressPath = getInProgressPathFor(partPath);
+
+				if (fs.exists(partPendingPath)) {
+					LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+					// has been moved to pending in the mean time, rename to final location
+					fs.rename(partPendingPath, partPath);
+				} else if (fs.exists(partInProgressPath)) {
+					LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+					// it was still in progress, rename to final path
+					fs.rename(partInProgressPath, partPath);
+				} else if (fs.exists(partPath)) {
+					LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
+				} else {
+					LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
+						"it was moved to final location by a previous snapshot restore", file);
+				}
+
+				// We use reflection to get the .truncate() method, this
+				// is only available starting with Hadoop 2.7
+				if (this.refTruncate == null) {
+					this.refTruncate = reflectTruncate(fs);
+				}
+
+				// truncate it or write a ".valid-length" file to specify up to which point it is valid
+				if (refTruncate != null) {
+					LOG.debug("Truncating {} to valid length {}", partPath, validLength);
+					// some-one else might still hold the lease from a previous try, we are
+					// recovering, after all ...
+					if (fs instanceof DistributedFileSystem) {
+						DistributedFileSystem dfs = (DistributedFileSystem) fs;
+						LOG.debug("Trying to recover file lease {}", partPath);
+						dfs.recoverLease(partPath);
+						boolean isclosed = dfs.isFileClosed(partPath);
+						StopWatch sw = new StopWatch();
+						sw.start();
+						while (!isclosed) {
+							if (sw.getTime() > asyncTimeout) {
+								break;
+							}
+							try {
+								Thread.sleep(500);
+							} catch (InterruptedException e1) {
+								// ignore it
 							}
+							isclosed = dfs.isFileClosed(partPath);
 						}
-						Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
-						if (!truncated) {
-							LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
-							// we must wait for the asynchronous truncate operation to complete
-							StopWatch sw = new StopWatch();
-							sw.start();
-							long newLen = fs.getFileStatus(partPath).getLen();
-							while (newLen != bucketState.currentFileValidLength) {
-								if (sw.getTime() > asyncTimeout) {
-									break;
-								}
-								try {
-									Thread.sleep(500);
-								} catch (InterruptedException e1) {
-									// ignore it
-								}
-								newLen = fs.getFileStatus(partPath).getLen();
+					}
+					Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
+					if (!truncated) {
+						LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
+
+						// we must wait for the asynchronous truncate operation to complete
+						StopWatch sw = new StopWatch();
+						sw.start();
+						long newLen = fs.getFileStatus(partPath).getLen();
+						while (newLen != validLength) {
+							if (sw.getTime() > asyncTimeout) {
+								break;
 							}
-							if (newLen != bucketState.currentFileValidLength) {
-								throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
+							try {
+								Thread.sleep(500);
+							} catch (InterruptedException e1) {
+								// ignore it
 							}
+							newLen = fs.getFileStatus(partPath).getLen();
 						}
-					} else {
-						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-						Path validLengthFilePath = getValidLengthPathFor(partPath);
-						if (!fs.exists(validLengthFilePath)) {
-							FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-							lengthFileOut.close();
+						if (newLen != validLength) {
+							throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
 						}
 					}
-
-					// Now that we've restored the bucket to a valid state, reset the current file info
-					bucketState.currentFile = null;
-					bucketState.currentFileValidLength = -1;
-					bucketState.isWriterOpen = false;
-				} catch (IOException e) {
-					LOG.error("Error while restoring BucketingSink state.", e);
-					throw new RuntimeException("Error while restoring BucketingSink state.", e);
-				} catch (InvocationTargetException | IllegalAccessException e) {
-					LOG.error("Could not invoke truncate.", e);
-					throw new RuntimeException("Could not invoke truncate.", e);
+				} else {
+					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
+					Path validLengthFilePath = getValidLengthPathFor(partPath);
+					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
+						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+						lengthFileOut.writeUTF(Long.toString(validLength));
+						lengthFileOut.close();
+					}
 				}
+
+			} catch (IOException e) {
+				LOG.error("Error while restoring BucketingSink state.", e);
+				throw new RuntimeException("Error while restoring BucketingSink state.", e);
+			} catch (InvocationTargetException | IllegalAccessException e) {
+				LOG.error("Could not invoke truncate.", e);
+				throw new RuntimeException("Could not invoke truncate.", e);
 			}
+		}
+	}
 
-			// Move files that are confirmed by a checkpoint but did not get moved to final location
-			// because the checkpoint notification did not happen before a failure
+	private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pendingFilesPerCheckpoint) {
+		// Move files that are confirmed by a checkpoint but did not get moved to final location
+		// because the checkpoint notification did not happen before a failure
 
-			LOG.debug("Moving pending files to final location on restore.");
+		LOG.debug("Moving pending files to final location on restore.");
 
-			Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
-			for (Long pastCheckpointId : pastCheckpointIds) {
-				// All the pending files are buckets that have been completed but are waiting to be renamed
-				// to their final name
-				for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
-					Path finalPath = new Path(filename);
-					Path pendingPath = getPendingPathFor(finalPath);
+		Set<Long> pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
+		for (Long pastCheckpointId : pastCheckpointIds) {
+			// All the pending files are buckets that have been completed but are waiting to be renamed
+			// to their final name
+			for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+				Path finalPath = new Path(filename);
+				Path pendingPath = getPendingPathFor(finalPath);
 
-					try {
-						if (fs.exists(pendingPath)) {
-							LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
-							fs.rename(pendingPath, finalPath);
-						}
-					} catch (IOException e) {
-						LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
-						throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
+				try {
+					if (fs.exists(pendingPath)) {
+						LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
+						fs.rename(pendingPath, finalPath);
 					}
+				} catch (IOException e) {
+					LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+					throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
 				}
 			}
+		}
+	}
 
-			synchronized (bucketState.pendingFilesPerCheckpoint) {
-				bucketState.pendingFilesPerCheckpoint.clear();
-			}
+	// --------------------------------------------------------------------------------------------
+	//  Backwards compatibility with Flink 1.1
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(RollingSink.BucketState state) throws Exception {
+		LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
+			throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
 		}
+
+		handleRestoredRollingSinkState(state);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
index 0c5e16b..3355fae 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.commons.io.FileUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
new file mode 100644
index 0000000..257b157
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public class RollingToBucketingMigrationTest {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	@Test
+	public void testMigration() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		BucketingSink<String> sink = new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(sink), 10, 1, 0);
+		testHarness1.setup();
+		testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness1.close();
+	}
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = RollingToBucketingMigrationTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	static class ValidatingBucketingSink<T> extends BucketingSink<T> {
+
+		private static final long serialVersionUID = -4263974081712009141L;
+
+		ValidatingBucketingSink(String basePath) {
+			super(basePath);
+		}
+
+		@Override
+		public void restoreState(RollingSink.BucketState state) throws Exception {
+
+			/**
+			 * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+			 * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+			 * 					validLength=6
+			 * pendingForNextCheckpoint=[]
+			 * pendingForPrevCheckpoints={0=[	/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+			 * */
+
+			String current = state.currentFile;
+			long validLength = state.currentFileValidLength;
+
+			Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+			Assert.assertEquals(6, validLength);
+
+			List<String> pendingFiles = state.pendingFiles;
+			Assert.assertTrue(pendingFiles.isEmpty());
+
+			final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+			Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+			for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+				long checkpoint = entry.getKey();
+				List<String> files = entry.getValue();
+
+				Assert.assertEquals(0L, checkpoint);
+				Assert.assertEquals(4, files.size());
+
+				for (int i = 0; i < 4; i++) {
+					Assert.assertEquals(
+						"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+						files.get(i));
+				}
+			}
+
+			super.restoreState(state);
+		}
+	}
+}