You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/13 10:45:50 UTC
[2/4] flink git commit: [FLINK-5318] Make the RollingSink backwards
compatible.
[FLINK-5318] Make the RollingSink backwards compatible.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/078e2489
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/078e2489
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/078e2489
Branch: refs/heads/release-1.2
Commit: 078e2489180b7544f2af48afef8147401cd9ebd6
Parents: 2215f82
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 15:38:28 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:38:44 2017 +0100
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 32 +++-
.../connectors/fs/bucketing/BucketingSink.java | 8 +-
.../fs/bucketing/RollingSinkMigrationTest.java | 183 +++++++++++++++++++
...olling-sink-migration-test-flink1.1-snapshot | Bin 0 -> 1471 bytes
4 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index fc4a35e..98eb2d4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
@@ -128,7 +129,8 @@ import java.util.UUID;
*/
@Deprecated
public class RollingSink<T> extends RichSinkFunction<T>
- implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
+ implements InputTypeConfigurable, CheckpointedFunction,
+ CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {
private static final long serialVersionUID = 1L;
@@ -336,7 +338,12 @@ public class RollingSink<T> extends RichSinkFunction<T>
Preconditions.checkArgument(this.restoredBucketStates == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
- initFileSystem();
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
+ }
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
@@ -703,7 +710,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
} else {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
- if (!fs.exists(validLengthFilePath)) {
+ if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
lengthFileOut.close();
@@ -753,6 +760,25 @@ public class RollingSink<T> extends RichSinkFunction<T>
}
// --------------------------------------------------------------------------------------------
+ // Backwards compatibility with Flink 1.1
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void restoreState(BucketState state) throws Exception {
+ LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
+ }
+
+ handleRestoredBucketState(state);
+ }
+
+ // --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cf2c373..e8bff21 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -58,7 +58,7 @@ import java.util.UUID;
import java.util.Iterator;
/**
- * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
+ * Sink that emits its input elements to {@link FileSystem} files within
* buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
*
* <p>
@@ -124,9 +124,9 @@ import java.util.Iterator;
* </li>
* <li>
* The part files are written using an instance of {@link Writer}. By default, a
- * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- * of {@code toString()} for every element, separated by newlines. You can configure the writer using the
- * {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
+ * {@link StringWriter} is used, which writes the result of {@code toString()} for
+ * every element, separated by newlines. You can configure the writer using the
+ * {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
* can be used to write Hadoop {@code SequenceFiles}.
* </li>
* </ol>
http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
new file mode 100644
index 0000000..0c5e16b
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -0,0 +1,183 @@
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+@Deprecated
+public class RollingSinkMigrationTest {
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String PART_PREFIX = "part";
+ private static final String PENDING_SUFFIX = ".pending";
+ private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+ @Test
+ public void testMigration() throws Exception {
+
+ /*
+ * Code ran to get the snapshot:
+ *
+ * final File outDir = tempFolder.newFolder();
+
+ RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
+ .setWriter(new StringWriter<String>())
+ .setBatchSize(5)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness1.setup();
+ testHarness1.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+ checkFs(outDir, 1, 1, 0, 0);
+
+ testHarness1.processElement(new StreamRecord<>("test3", 0L));
+ testHarness1.processElement(new StreamRecord<>("test4", 0L));
+ testHarness1.processElement(new StreamRecord<>("test5", 0L));
+
+ checkFs(outDir, 1, 4, 0, 0);
+
+ StreamTaskState taskState = testHarness1.snapshot(0, 0);
+ testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
+ testHarness1.close();
+ * */
+
+ final File outDir = tempFolder.newFolder();
+
+ RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath())
+ .setWriter(new StringWriter<String>())
+ .setBatchSize(5)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(sink), 10, 1, 0);
+ testHarness1.setup();
+ testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+ testHarness1.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+ checkFs(outDir, 1, 1, 0, 0);
+
+ testHarness1.close();
+ }
+
+ private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+ int inProg = 0;
+ int pend = 0;
+ int compl = 0;
+ int val = 0;
+
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (file.getAbsolutePath().endsWith("crc")) {
+ continue;
+ }
+ String path = file.getPath();
+ if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+ inProg++;
+ } else if (path.endsWith(PENDING_SUFFIX)) {
+ pend++;
+ } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+ val++;
+ } else if (path.contains(PART_PREFIX)) {
+ compl++;
+ }
+ }
+
+ Assert.assertEquals(inprogress, inProg);
+ Assert.assertEquals(pending, pend);
+ Assert.assertEquals(completed, compl);
+ Assert.assertEquals(valid, val);
+ }
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ return resource.getFile();
+ }
+
+ static class ValidatingRollingSink<T> extends RollingSink<T> {
+
+ private static final long serialVersionUID = -4263974081712009141L;
+
+ ValidatingRollingSink(String basePath) {
+ super(basePath);
+ }
+
+ @Override
+ public void restoreState(BucketState state) throws Exception {
+
+ /**
+ * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+ * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+ * validLength=6
+ * pendingForNextCheckpoint=[]
+ * pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+ * */
+
+ String current = state.currentFile;
+ long validLength = state.currentFileValidLength;
+
+ Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+ Assert.assertEquals(6, validLength);
+
+ List<String> pendingFiles = state.pendingFiles;
+ Assert.assertTrue(pendingFiles.isEmpty());
+
+ final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+ Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+ for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+ long checkpoint = entry.getKey();
+ List<String> files = entry.getValue();
+
+ Assert.assertEquals(0L, checkpoint);
+ Assert.assertEquals(4, files.size());
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertEquals(
+ "/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+ files.get(i));
+ }
+ }
+ super.restoreState(state);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..2ebd70a
Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot differ