You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/13 10:55:21 UTC

[1/2] flink git commit: [FLINK-5443] Migrate from Rolling to Bucketing sink.

Repository: flink
Updated Branches:
  refs/heads/master 7a2d3bea9 -> 2bda5e457


[FLINK-5443] Migrate from Rolling to Bucketing sink.

To migrate from a RollingSink to a BucketingSink, a
user can now take a savepoint, change his code to
use a BuckeingSink with the same properties as the
previous RollingSink, and resume his program from
that savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bda5e45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bda5e45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bda5e45

Branch: refs/heads/master
Commit: 2bda5e457082f1dd05736d0e18b8d3bae4ba7c4e
Parents: bbca329
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 16:52:51 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:46:45 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  10 +-
 .../connectors/fs/bucketing/BucketingSink.java  | 278 +++++++++++--------
 .../fs/bucketing/RollingSinkMigrationTest.java  |  17 ++
 .../RollingToBucketingMigrationTest.java        | 161 +++++++++++
 4 files changed, 348 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 98eb2d4..429d00a 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -905,30 +905,30 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	 * This is used for keeping track of the current in-progress files and files that we mark
 	 * for moving from pending to final location after we get a checkpoint-complete notification.
 	 */
-	static final class BucketState implements Serializable {
+	public static final class BucketState implements Serializable {
 		private static final long serialVersionUID = 1L;
 
 		/**
 		 * The file that was in-progress when the last checkpoint occurred.
 		 */
-		String currentFile;
+		public String currentFile;
 
 		/**
 		 * The valid length of the in-progress file at the time of the last checkpoint.
 		 */
-		long currentFileValidLength = -1;
+		public long currentFileValidLength = -1;
 
 		/**
 		 * Pending files that accumulated since the last checkpoint.
 		 */
-		List<String> pendingFiles = new ArrayList<>();
+		public List<String> pendingFiles = new ArrayList<>();
 
 		/**
 		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
 		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
 		 * pending files of completed checkpoints to their final location.
 		 */
-		final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
+		public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
 
 		@Override
 		public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index e8bff21..7dbcda7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
@@ -149,7 +151,8 @@ import java.util.Iterator;
  */
 public class BucketingSink<T>
 		extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
+		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
+					CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
 
 	private static final long serialVersionUID = 1L;
 
@@ -344,7 +347,12 @@ public class BucketingSink<T>
 	public void initializeState(FunctionInitializationContext context) throws Exception {
 		Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
 
-		initFileSystem();
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+			throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+		}
 
 		if (this.refTruncate == null) {
 			this.refTruncate = reflectTruncate(fs);
@@ -706,139 +714,183 @@ public class BucketingSink<T>
 			// (we re-start from the last **successful** checkpoint)
 			bucketState.pendingFiles.clear();
 
-			if (bucketState.currentFile != null) {
+			handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
 
-				// We were writing to a file when the last checkpoint occurred. This file can either
-				// be still in-progress or became a pending file at some point after the checkpoint.
-				// Either way, we have to truncate it back to a valid state (or write a .valid-length
-				// file that specifies up to which length it is valid) and rename it to the final name
-				// before starting a new bucket file.
+			// Now that we've restored the bucket to a valid state, reset the current file info
+			bucketState.currentFile = null;
+			bucketState.currentFileValidLength = -1;
+			bucketState.isWriterOpen = false;
 
-				Path partPath = new Path(bucketState.currentFile);
-				try {
-					Path partPendingPath = getPendingPathFor(partPath);
-					Path partInProgressPath = getInProgressPathFor(partPath);
-
-					if (fs.exists(partPendingPath)) {
-						LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
-						// has been moved to pending in the mean time, rename to final location
-						fs.rename(partPendingPath, partPath);
-					} else if (fs.exists(partInProgressPath)) {
-						LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
-						// it was still in progress, rename to final path
-						fs.rename(partInProgressPath, partPath);
-					} else if (fs.exists(partPath)) {
-						LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
-					} else {
-						LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
-							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
-					}
+			handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
 
-					// We use reflection to get the .truncate() method, this
-					// is only available starting with Hadoop 2.7
-					if (this.refTruncate == null) {
-						this.refTruncate = reflectTruncate(fs);
-					}
+			synchronized (bucketState.pendingFilesPerCheckpoint) {
+				bucketState.pendingFilesPerCheckpoint.clear();
+			}
+		}
+	}
+
+	private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
+		restoredState.pendingFiles.clear();
 
-					// truncate it or write a ".valid-length" file to specify up to which point it is valid
-					if (refTruncate != null) {
-						LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
-						// some-one else might still hold the lease from a previous try, we are
-						// recovering, after all ...
-						if (fs instanceof DistributedFileSystem) {
-							DistributedFileSystem dfs = (DistributedFileSystem) fs;
-							LOG.debug("Trying to recover file lease {}", partPath);
-							dfs.recoverLease(partPath);
-							boolean isclosed = dfs.isFileClosed(partPath);
-							StopWatch sw = new StopWatch();
-							sw.start();
-							while (!isclosed) {
-								if (sw.getTime() > asyncTimeout) {
-									break;
-								}
-								try {
-									Thread.sleep(500);
-								} catch (InterruptedException e1) {
-									// ignore it
-								}
-								isclosed = dfs.isFileClosed(partPath);
+		handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
+
+		// Now that we've restored the bucket to a valid state, reset the current file info
+		restoredState.currentFile = null;
+		restoredState.currentFileValidLength = -1;
+
+		handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
+
+		synchronized (restoredState.pendingFilesPerCheckpoint) {
+			restoredState.pendingFilesPerCheckpoint.clear();
+		}
+	}
+
+	private void handlePendingInProgressFile(String file, long validLength) {
+		if (file != null) {
+
+			// We were writing to a file when the last checkpoint occurred. This file can either
+			// be still in-progress or became a pending file at some point after the checkpoint.
+			// Either way, we have to truncate it back to a valid state (or write a .valid-length
+			// file that specifies up to which length it is valid) and rename it to the final name
+			// before starting a new bucket file.
+
+			Path partPath = new Path(file);
+			try {
+				Path partPendingPath = getPendingPathFor(partPath);
+				Path partInProgressPath = getInProgressPathFor(partPath);
+
+				if (fs.exists(partPendingPath)) {
+					LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+					// has been moved to pending in the mean time, rename to final location
+					fs.rename(partPendingPath, partPath);
+				} else if (fs.exists(partInProgressPath)) {
+					LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+					// it was still in progress, rename to final path
+					fs.rename(partInProgressPath, partPath);
+				} else if (fs.exists(partPath)) {
+					LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
+				} else {
+					LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
+						"it was moved to final location by a previous snapshot restore", file);
+				}
+
+				// We use reflection to get the .truncate() method, this
+				// is only available starting with Hadoop 2.7
+				if (this.refTruncate == null) {
+					this.refTruncate = reflectTruncate(fs);
+				}
+
+				// truncate it or write a ".valid-length" file to specify up to which point it is valid
+				if (refTruncate != null) {
+					LOG.debug("Truncating {} to valid length {}", partPath, validLength);
+					// some-one else might still hold the lease from a previous try, we are
+					// recovering, after all ...
+					if (fs instanceof DistributedFileSystem) {
+						DistributedFileSystem dfs = (DistributedFileSystem) fs;
+						LOG.debug("Trying to recover file lease {}", partPath);
+						dfs.recoverLease(partPath);
+						boolean isclosed = dfs.isFileClosed(partPath);
+						StopWatch sw = new StopWatch();
+						sw.start();
+						while (!isclosed) {
+							if (sw.getTime() > asyncTimeout) {
+								break;
+							}
+							try {
+								Thread.sleep(500);
+							} catch (InterruptedException e1) {
+								// ignore it
 							}
+							isclosed = dfs.isFileClosed(partPath);
 						}
-						Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
-						if (!truncated) {
-							LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
-							// we must wait for the asynchronous truncate operation to complete
-							StopWatch sw = new StopWatch();
-							sw.start();
-							long newLen = fs.getFileStatus(partPath).getLen();
-							while (newLen != bucketState.currentFileValidLength) {
-								if (sw.getTime() > asyncTimeout) {
-									break;
-								}
-								try {
-									Thread.sleep(500);
-								} catch (InterruptedException e1) {
-									// ignore it
-								}
-								newLen = fs.getFileStatus(partPath).getLen();
+					}
+					Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
+					if (!truncated) {
+						LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
+
+						// we must wait for the asynchronous truncate operation to complete
+						StopWatch sw = new StopWatch();
+						sw.start();
+						long newLen = fs.getFileStatus(partPath).getLen();
+						while (newLen != validLength) {
+							if (sw.getTime() > asyncTimeout) {
+								break;
 							}
-							if (newLen != bucketState.currentFileValidLength) {
-								throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
+							try {
+								Thread.sleep(500);
+							} catch (InterruptedException e1) {
+								// ignore it
 							}
+							newLen = fs.getFileStatus(partPath).getLen();
 						}
-					} else {
-						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-						Path validLengthFilePath = getValidLengthPathFor(partPath);
-						if (!fs.exists(validLengthFilePath)) {
-							FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-							lengthFileOut.close();
+						if (newLen != validLength) {
+							throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
 						}
 					}
-
-					// Now that we've restored the bucket to a valid state, reset the current file info
-					bucketState.currentFile = null;
-					bucketState.currentFileValidLength = -1;
-					bucketState.isWriterOpen = false;
-				} catch (IOException e) {
-					LOG.error("Error while restoring BucketingSink state.", e);
-					throw new RuntimeException("Error while restoring BucketingSink state.", e);
-				} catch (InvocationTargetException | IllegalAccessException e) {
-					LOG.error("Could not invoke truncate.", e);
-					throw new RuntimeException("Could not invoke truncate.", e);
+				} else {
+					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
+					Path validLengthFilePath = getValidLengthPathFor(partPath);
+					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
+						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+						lengthFileOut.writeUTF(Long.toString(validLength));
+						lengthFileOut.close();
+					}
 				}
+
+			} catch (IOException e) {
+				LOG.error("Error while restoring BucketingSink state.", e);
+				throw new RuntimeException("Error while restoring BucketingSink state.", e);
+			} catch (InvocationTargetException | IllegalAccessException e) {
+				LOG.error("Could not invoke truncate.", e);
+				throw new RuntimeException("Could not invoke truncate.", e);
 			}
+		}
+	}
 
-			// Move files that are confirmed by a checkpoint but did not get moved to final location
-			// because the checkpoint notification did not happen before a failure
+	private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pendingFilesPerCheckpoint) {
+		// Move files that are confirmed by a checkpoint but did not get moved to final location
+		// because the checkpoint notification did not happen before a failure
 
-			LOG.debug("Moving pending files to final location on restore.");
+		LOG.debug("Moving pending files to final location on restore.");
 
-			Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
-			for (Long pastCheckpointId : pastCheckpointIds) {
-				// All the pending files are buckets that have been completed but are waiting to be renamed
-				// to their final name
-				for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
-					Path finalPath = new Path(filename);
-					Path pendingPath = getPendingPathFor(finalPath);
+		Set<Long> pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
+		for (Long pastCheckpointId : pastCheckpointIds) {
+			// All the pending files are buckets that have been completed but are waiting to be renamed
+			// to their final name
+			for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+				Path finalPath = new Path(filename);
+				Path pendingPath = getPendingPathFor(finalPath);
 
-					try {
-						if (fs.exists(pendingPath)) {
-							LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
-							fs.rename(pendingPath, finalPath);
-						}
-					} catch (IOException e) {
-						LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
-						throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
+				try {
+					if (fs.exists(pendingPath)) {
+						LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
+						fs.rename(pendingPath, finalPath);
 					}
+				} catch (IOException e) {
+					LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+					throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
 				}
 			}
+		}
+	}
 
-			synchronized (bucketState.pendingFilesPerCheckpoint) {
-				bucketState.pendingFilesPerCheckpoint.clear();
-			}
+	// --------------------------------------------------------------------------------------------
+	//  Backwards compatibility with Flink 1.1
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(RollingSink.BucketState state) throws Exception {
+		LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
+			throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
 		}
+
+		handleRestoredRollingSinkState(state);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
index 0c5e16b..3355fae 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.commons.io.FileUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
new file mode 100644
index 0000000..257b157
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public class RollingToBucketingMigrationTest {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	@Test
+	public void testMigration() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		BucketingSink<String> sink = new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(sink), 10, 1, 0);
+		testHarness1.setup();
+		testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness1.close();
+	}
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = RollingToBucketingMigrationTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	static class ValidatingBucketingSink<T> extends BucketingSink<T> {
+
+		private static final long serialVersionUID = -4263974081712009141L;
+
+		ValidatingBucketingSink(String basePath) {
+			super(basePath);
+		}
+
+		@Override
+		public void restoreState(RollingSink.BucketState state) throws Exception {
+
+			/**
+			 * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+			 * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+			 * 					validLength=6
+			 * pendingForNextCheckpoint=[]
+			 * pendingForPrevCheckpoints={0=[	/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+			 * */
+
+			String current = state.currentFile;
+			long validLength = state.currentFileValidLength;
+
+			Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+			Assert.assertEquals(6, validLength);
+
+			List<String> pendingFiles = state.pendingFiles;
+			Assert.assertTrue(pendingFiles.isEmpty());
+
+			final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+			Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+			for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+				long checkpoint = entry.getKey();
+				List<String> files = entry.getValue();
+
+				Assert.assertEquals(0L, checkpoint);
+				Assert.assertEquals(4, files.size());
+
+				for (int i = 0; i < 4; i++) {
+					Assert.assertEquals(
+						"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+						files.get(i));
+				}
+			}
+
+			super.restoreState(state);
+		}
+	}
+}


[2/2] flink git commit: [FLINK-5318] Make the RollingSink backwards compatible.

Posted by al...@apache.org.
[FLINK-5318] Make the RollingSink backwards compatible.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbca3293
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbca3293
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbca3293

Branch: refs/heads/master
Commit: bbca3293ab84e8af124e66fcdfc394bbf8d5954b
Parents: 7a2d3be
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 15:38:28 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:46:45 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  32 +++-
 .../connectors/fs/bucketing/BucketingSink.java  |   8 +-
 .../fs/bucketing/RollingSinkMigrationTest.java  | 183 +++++++++++++++++++
 ...olling-sink-migration-test-flink1.1-snapshot | Bin 0 -> 1471 bytes
 4 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index fc4a35e..98eb2d4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.flink.util.Preconditions;
@@ -128,7 +129,8 @@ import java.util.UUID;
  */
 @Deprecated
 public class RollingSink<T> extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
+		implements InputTypeConfigurable, CheckpointedFunction,
+					CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -336,7 +338,12 @@ public class RollingSink<T> extends RichSinkFunction<T>
 		Preconditions.checkArgument(this.restoredBucketStates == null,
 			"The " + getClass().getSimpleName() + " has already been initialized.");
 
-		initFileSystem();
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
+			throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
+		}
 
 		if (this.refTruncate == null) {
 			this.refTruncate = reflectTruncate(fs);
@@ -703,7 +710,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
 				} else {
 					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
-					if (!fs.exists(validLengthFilePath)) {
+					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
 						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
 						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
 						lengthFileOut.close();
@@ -753,6 +760,25 @@ public class RollingSink<T> extends RichSinkFunction<T>
 	}
 
 	// --------------------------------------------------------------------------------------------
+	//  Backwards compatibility with Flink 1.1
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(BucketState state) throws Exception {
+		LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+		try {
+			initFileSystem();
+		} catch (IOException e) {
+			LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
+			throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
+		}
+
+		handleRestoredBucketState(state);
+	}
+
+	// --------------------------------------------------------------------------------------------
 	//  Setters for User configuration values
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cf2c373..e8bff21 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -58,7 +58,7 @@ import java.util.UUID;
 import java.util.Iterator;
 
 /**
- * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
+ * Sink that emits its input elements to {@link FileSystem} files within
  * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
  *
  * <p>
@@ -124,9 +124,9 @@ import java.util.Iterator;
  *     </li>
  *     <li>
  *         The part files are written using an instance of {@link Writer}. By default, a
- *         {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- *         of {@code toString()} for every element, separated by newlines. You can configure the writer using  the
- *         {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
+ *         {@link StringWriter} is used, which writes the result of {@code toString()} for
+ *         every element, separated by newlines. You can configure the writer using the
+ *         {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
  *         can be used to write Hadoop {@code SequenceFiles}.
  *     </li>
  * </ol>

http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
new file mode 100644
index 0000000..0c5e16b
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -0,0 +1,183 @@
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+@Deprecated
+public class RollingSinkMigrationTest {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static final String PART_PREFIX = "part";
+	private static final String PENDING_SUFFIX = ".pending";
+	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+	private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+	@Test
+	public void testMigration() throws Exception {
+
+		/*
+		* Code ran to get the snapshot:
+		*
+		* final File outDir = tempFolder.newFolder();
+
+		RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
+			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+		testHarness1.setup();
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness1.processElement(new StreamRecord<>("test3", 0L));
+		testHarness1.processElement(new StreamRecord<>("test4", 0L));
+		testHarness1.processElement(new StreamRecord<>("test5", 0L));
+
+		checkFs(outDir, 1, 4, 0, 0);
+
+		StreamTaskState taskState = testHarness1.snapshot(0, 0);
+		testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
+		testHarness1.close();
+		* */
+
+		final File outDir = tempFolder.newFolder();
+
+		RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath())
+			.setWriter(new StringWriter<String>())
+			.setBatchSize(5)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(sink), 10, 1, 0);
+		testHarness1.setup();
+		testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+		testHarness1.open();
+
+		testHarness1.processElement(new StreamRecord<>("test1", 0L));
+		testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+		checkFs(outDir, 1, 1, 0, 0);
+
+		testHarness1.close();
+	}
+
+	private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+		int inProg = 0;
+		int pend = 0;
+		int compl = 0;
+		int val = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+			String path = file.getPath();
+			if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+				inProg++;
+			} else if (path.endsWith(PENDING_SUFFIX)) {
+				pend++;
+			} else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+				val++;
+			} else if (path.contains(PART_PREFIX)) {
+				compl++;
+			}
+		}
+
+		Assert.assertEquals(inprogress, inProg);
+		Assert.assertEquals(pending, pend);
+		Assert.assertEquals(completed, compl);
+		Assert.assertEquals(valid, val);
+	}
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	static class ValidatingRollingSink<T> extends RollingSink<T> {
+
+		private static final long serialVersionUID = -4263974081712009141L;
+
+		ValidatingRollingSink(String basePath) {
+			super(basePath);
+		}
+
+		@Override
+		public void restoreState(BucketState state) throws Exception {
+
+			/**
+			 * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+			 * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+			 * 					validLength=6
+			 * pendingForNextCheckpoint=[]
+			 * pendingForPrevCheckpoints={0=[	/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+			 * 									/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+			 * */
+
+			String current = state.currentFile;
+			long validLength = state.currentFileValidLength;
+
+			Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+			Assert.assertEquals(6, validLength);
+
+			List<String> pendingFiles = state.pendingFiles;
+			Assert.assertTrue(pendingFiles.isEmpty());
+
+			final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+			Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+			for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+				long checkpoint = entry.getKey();
+				List<String> files = entry.getValue();
+
+				Assert.assertEquals(0L, checkpoint);
+				Assert.assertEquals(4, files.size());
+
+				for (int i = 0; i < 4; i++) {
+					Assert.assertEquals(
+						"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+						files.get(i));
+				}
+			}
+			super.restoreState(state);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..2ebd70a
Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot differ