You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/13 10:55:21 UTC
[1/2] flink git commit: [FLINK-5443] Migrate from Rolling to
Bucketing sink.
Repository: flink
Updated Branches:
refs/heads/master 7a2d3bea9 -> 2bda5e457
[FLINK-5443] Migrate from Rolling to Bucketing sink.
To migrate from a RollingSink to a BucketingSink, a
user can now take a savepoint, change his code to
use a BuckeingSink with the same properties as the
previous RollingSink, and resume his program from
that savepoint.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bda5e45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bda5e45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bda5e45
Branch: refs/heads/master
Commit: 2bda5e457082f1dd05736d0e18b8d3bae4ba7c4e
Parents: bbca329
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 16:52:51 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:46:45 2017 +0100
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 10 +-
.../connectors/fs/bucketing/BucketingSink.java | 278 +++++++++++--------
.../fs/bucketing/RollingSinkMigrationTest.java | 17 ++
.../RollingToBucketingMigrationTest.java | 161 +++++++++++
4 files changed, 348 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 98eb2d4..429d00a 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -905,30 +905,30 @@ public class RollingSink<T> extends RichSinkFunction<T>
* This is used for keeping track of the current in-progress files and files that we mark
* for moving from pending to final location after we get a checkpoint-complete notification.
*/
- static final class BucketState implements Serializable {
+ public static final class BucketState implements Serializable {
private static final long serialVersionUID = 1L;
/**
* The file that was in-progress when the last checkpoint occurred.
*/
- String currentFile;
+ public String currentFile;
/**
* The valid length of the in-progress file at the time of the last checkpoint.
*/
- long currentFileValidLength = -1;
+ public long currentFileValidLength = -1;
/**
* Pending files that accumulated since the last checkpoint.
*/
- List<String> pendingFiles = new ArrayList<>();
+ public List<String> pendingFiles = new ArrayList<>();
/**
* When doing a checkpoint we move the pending files since the last checkpoint to this map
* with the id of the checkpoint. When we get the checkpoint-complete notification we move
* pending files of completed checkpoints to their final location.
*/
- final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
+ public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index e8bff21..7dbcda7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.Writer;
@@ -149,7 +151,8 @@ import java.util.Iterator;
*/
public class BucketingSink<T>
extends RichSinkFunction<T>
- implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
+ implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
+ CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
@@ -344,7 +347,12 @@ public class BucketingSink<T>
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
- initFileSystem();
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+ }
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
@@ -706,139 +714,183 @@ public class BucketingSink<T>
// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();
- if (bucketState.currentFile != null) {
+ handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
- // We were writing to a file when the last checkpoint occurred. This file can either
- // be still in-progress or became a pending file at some point after the checkpoint.
- // Either way, we have to truncate it back to a valid state (or write a .valid-length
- // file that specifies up to which length it is valid) and rename it to the final name
- // before starting a new bucket file.
+ // Now that we've restored the bucket to a valid state, reset the current file info
+ bucketState.currentFile = null;
+ bucketState.currentFileValidLength = -1;
+ bucketState.isWriterOpen = false;
- Path partPath = new Path(bucketState.currentFile);
- try {
- Path partPendingPath = getPendingPathFor(partPath);
- Path partInProgressPath = getInProgressPathFor(partPath);
-
- if (fs.exists(partPendingPath)) {
- LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
- // has been moved to pending in the mean time, rename to final location
- fs.rename(partPendingPath, partPath);
- } else if (fs.exists(partInProgressPath)) {
- LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
- // it was still in progress, rename to final path
- fs.rename(partInProgressPath, partPath);
- } else if (fs.exists(partPath)) {
- LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
- } else {
- LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
- "it was moved to final location by a previous snapshot restore", bucketState.currentFile);
- }
+ handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
- // We use reflection to get the .truncate() method, this
- // is only available starting with Hadoop 2.7
- if (this.refTruncate == null) {
- this.refTruncate = reflectTruncate(fs);
- }
+ synchronized (bucketState.pendingFilesPerCheckpoint) {
+ bucketState.pendingFilesPerCheckpoint.clear();
+ }
+ }
+ }
+
+ private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
+ restoredState.pendingFiles.clear();
- // truncate it or write a ".valid-length" file to specify up to which point it is valid
- if (refTruncate != null) {
- LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
- // some-one else might still hold the lease from a previous try, we are
- // recovering, after all ...
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
- LOG.debug("Trying to recover file lease {}", partPath);
- dfs.recoverLease(partPath);
- boolean isclosed = dfs.isFileClosed(partPath);
- StopWatch sw = new StopWatch();
- sw.start();
- while (!isclosed) {
- if (sw.getTime() > asyncTimeout) {
- break;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // ignore it
- }
- isclosed = dfs.isFileClosed(partPath);
+ handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
+
+ // Now that we've restored the bucket to a valid state, reset the current file info
+ restoredState.currentFile = null;
+ restoredState.currentFileValidLength = -1;
+
+ handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
+
+ synchronized (restoredState.pendingFilesPerCheckpoint) {
+ restoredState.pendingFilesPerCheckpoint.clear();
+ }
+ }
+
+ private void handlePendingInProgressFile(String file, long validLength) {
+ if (file != null) {
+
+ // We were writing to a file when the last checkpoint occurred. This file can either
+ // be still in-progress or became a pending file at some point after the checkpoint.
+ // Either way, we have to truncate it back to a valid state (or write a .valid-length
+ // file that specifies up to which length it is valid) and rename it to the final name
+ // before starting a new bucket file.
+
+ Path partPath = new Path(file);
+ try {
+ Path partPendingPath = getPendingPathFor(partPath);
+ Path partInProgressPath = getInProgressPathFor(partPath);
+
+ if (fs.exists(partPendingPath)) {
+ LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+ // has been moved to pending in the mean time, rename to final location
+ fs.rename(partPendingPath, partPath);
+ } else if (fs.exists(partInProgressPath)) {
+ LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+ // it was still in progress, rename to final path
+ fs.rename(partInProgressPath, partPath);
+ } else if (fs.exists(partPath)) {
+ LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
+ } else {
+ LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
+ "it was moved to final location by a previous snapshot restore", file);
+ }
+
+ // We use reflection to get the .truncate() method, this
+ // is only available starting with Hadoop 2.7
+ if (this.refTruncate == null) {
+ this.refTruncate = reflectTruncate(fs);
+ }
+
+ // truncate it or write a ".valid-length" file to specify up to which point it is valid
+ if (refTruncate != null) {
+ LOG.debug("Truncating {} to valid length {}", partPath, validLength);
+ // some-one else might still hold the lease from a previous try, we are
+ // recovering, after all ...
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ LOG.debug("Trying to recover file lease {}", partPath);
+ dfs.recoverLease(partPath);
+ boolean isclosed = dfs.isFileClosed(partPath);
+ StopWatch sw = new StopWatch();
+ sw.start();
+ while (!isclosed) {
+ if (sw.getTime() > asyncTimeout) {
+ break;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // ignore it
}
+ isclosed = dfs.isFileClosed(partPath);
}
- Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
- if (!truncated) {
- LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
- // we must wait for the asynchronous truncate operation to complete
- StopWatch sw = new StopWatch();
- sw.start();
- long newLen = fs.getFileStatus(partPath).getLen();
- while (newLen != bucketState.currentFileValidLength) {
- if (sw.getTime() > asyncTimeout) {
- break;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // ignore it
- }
- newLen = fs.getFileStatus(partPath).getLen();
+ }
+ Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
+ if (!truncated) {
+ LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
+
+ // we must wait for the asynchronous truncate operation to complete
+ StopWatch sw = new StopWatch();
+ sw.start();
+ long newLen = fs.getFileStatus(partPath).getLen();
+ while (newLen != validLength) {
+ if (sw.getTime() > asyncTimeout) {
+ break;
}
- if (newLen != bucketState.currentFileValidLength) {
- throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // ignore it
}
+ newLen = fs.getFileStatus(partPath).getLen();
}
- } else {
- LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
- Path validLengthFilePath = getValidLengthPathFor(partPath);
- if (!fs.exists(validLengthFilePath)) {
- FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
- lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
- lengthFileOut.close();
+ if (newLen != validLength) {
+ throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
}
}
-
- // Now that we've restored the bucket to a valid state, reset the current file info
- bucketState.currentFile = null;
- bucketState.currentFileValidLength = -1;
- bucketState.isWriterOpen = false;
- } catch (IOException e) {
- LOG.error("Error while restoring BucketingSink state.", e);
- throw new RuntimeException("Error while restoring BucketingSink state.", e);
- } catch (InvocationTargetException | IllegalAccessException e) {
- LOG.error("Could not invoke truncate.", e);
- throw new RuntimeException("Could not invoke truncate.", e);
+ } else {
+ LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
+ Path validLengthFilePath = getValidLengthPathFor(partPath);
+ if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
+ FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+ lengthFileOut.writeUTF(Long.toString(validLength));
+ lengthFileOut.close();
+ }
}
+
+ } catch (IOException e) {
+ LOG.error("Error while restoring BucketingSink state.", e);
+ throw new RuntimeException("Error while restoring BucketingSink state.", e);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ LOG.error("Could not invoke truncate.", e);
+ throw new RuntimeException("Could not invoke truncate.", e);
}
+ }
+ }
- // Move files that are confirmed by a checkpoint but did not get moved to final location
- // because the checkpoint notification did not happen before a failure
+ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pendingFilesPerCheckpoint) {
+ // Move files that are confirmed by a checkpoint but did not get moved to final location
+ // because the checkpoint notification did not happen before a failure
- LOG.debug("Moving pending files to final location on restore.");
+ LOG.debug("Moving pending files to final location on restore.");
- Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
- for (Long pastCheckpointId : pastCheckpointIds) {
- // All the pending files are buckets that have been completed but are waiting to be renamed
- // to their final name
- for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
- Path finalPath = new Path(filename);
- Path pendingPath = getPendingPathFor(finalPath);
+ Set<Long> pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
+ for (Long pastCheckpointId : pastCheckpointIds) {
+ // All the pending files are buckets that have been completed but are waiting to be renamed
+ // to their final name
+ for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+ Path finalPath = new Path(filename);
+ Path pendingPath = getPendingPathFor(finalPath);
- try {
- if (fs.exists(pendingPath)) {
- LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
- fs.rename(pendingPath, finalPath);
- }
- } catch (IOException e) {
- LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
- throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
+ try {
+ if (fs.exists(pendingPath)) {
+ LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
+ fs.rename(pendingPath, finalPath);
}
+ } catch (IOException e) {
+ LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+ throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
}
}
+ }
+ }
- synchronized (bucketState.pendingFilesPerCheckpoint) {
- bucketState.pendingFilesPerCheckpoint.clear();
- }
+ // --------------------------------------------------------------------------------------------
+ // Backwards compatibility with Flink 1.1
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void restoreState(RollingSink.BucketState state) throws Exception {
+ LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
}
+
+ handleRestoredRollingSinkState(state);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
index 0c5e16b..3355fae 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.commons.io.FileUtils;
http://git-wip-us.apache.org/repos/asf/flink/blob/2bda5e45/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
new file mode 100644
index 0000000..257b157
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public class RollingToBucketingMigrationTest {
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String PART_PREFIX = "part";
+ private static final String PENDING_SUFFIX = ".pending";
+ private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+ @Test
+ public void testMigration() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ BucketingSink<String> sink = new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
+ .setWriter(new StringWriter<String>())
+ .setBatchSize(5)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(sink), 10, 1, 0);
+ testHarness1.setup();
+ testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+ testHarness1.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+ checkFs(outDir, 1, 1, 0, 0);
+
+ testHarness1.close();
+ }
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = RollingToBucketingMigrationTest.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ return resource.getFile();
+ }
+
+ private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+ int inProg = 0;
+ int pend = 0;
+ int compl = 0;
+ int val = 0;
+
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (file.getAbsolutePath().endsWith("crc")) {
+ continue;
+ }
+ String path = file.getPath();
+ if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+ inProg++;
+ } else if (path.endsWith(PENDING_SUFFIX)) {
+ pend++;
+ } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+ val++;
+ } else if (path.contains(PART_PREFIX)) {
+ compl++;
+ }
+ }
+
+ Assert.assertEquals(inprogress, inProg);
+ Assert.assertEquals(pending, pend);
+ Assert.assertEquals(completed, compl);
+ Assert.assertEquals(valid, val);
+ }
+
+ static class ValidatingBucketingSink<T> extends BucketingSink<T> {
+
+ private static final long serialVersionUID = -4263974081712009141L;
+
+ ValidatingBucketingSink(String basePath) {
+ super(basePath);
+ }
+
+ @Override
+ public void restoreState(RollingSink.BucketState state) throws Exception {
+
+ /**
+ * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+ * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+ * validLength=6
+ * pendingForNextCheckpoint=[]
+ * pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+ * */
+
+ String current = state.currentFile;
+ long validLength = state.currentFileValidLength;
+
+ Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+ Assert.assertEquals(6, validLength);
+
+ List<String> pendingFiles = state.pendingFiles;
+ Assert.assertTrue(pendingFiles.isEmpty());
+
+ final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+ Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+ for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+ long checkpoint = entry.getKey();
+ List<String> files = entry.getValue();
+
+ Assert.assertEquals(0L, checkpoint);
+ Assert.assertEquals(4, files.size());
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertEquals(
+ "/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+ files.get(i));
+ }
+ }
+
+ super.restoreState(state);
+ }
+ }
+}
[2/2] flink git commit: [FLINK-5318] Make the RollingSink backwards
compatible.
Posted by al...@apache.org.
[FLINK-5318] Make the RollingSink backwards compatible.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbca3293
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbca3293
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbca3293
Branch: refs/heads/master
Commit: bbca3293ab84e8af124e66fcdfc394bbf8d5954b
Parents: 7a2d3be
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 15:38:28 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:46:45 2017 +0100
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 32 +++-
.../connectors/fs/bucketing/BucketingSink.java | 8 +-
.../fs/bucketing/RollingSinkMigrationTest.java | 183 +++++++++++++++++++
...olling-sink-migration-test-flink1.1-snapshot | Bin 0 -> 1471 bytes
4 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index fc4a35e..98eb2d4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions;
@@ -128,7 +129,8 @@ import java.util.UUID;
*/
@Deprecated
public class RollingSink<T> extends RichSinkFunction<T>
- implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
+ implements InputTypeConfigurable, CheckpointedFunction,
+ CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> {
private static final long serialVersionUID = 1L;
@@ -336,7 +338,12 @@ public class RollingSink<T> extends RichSinkFunction<T>
Preconditions.checkArgument(this.restoredBucketStates == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
- initFileSystem();
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
+ }
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
@@ -703,7 +710,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
} else {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
Path validLengthFilePath = getValidLengthPathFor(partPath);
- if (!fs.exists(validLengthFilePath)) {
+ if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
lengthFileOut.close();
@@ -753,6 +760,25 @@ public class RollingSink<T> extends RichSinkFunction<T>
}
// --------------------------------------------------------------------------------------------
+ // Backwards compatibility with Flink 1.1
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void restoreState(BucketState state) throws Exception {
+ LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
+ }
+
+ handleRestoredBucketState(state);
+ }
+
+ // --------------------------------------------------------------------------------------------
// Setters for User configuration values
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cf2c373..e8bff21 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -58,7 +58,7 @@ import java.util.UUID;
import java.util.Iterator;
/**
- * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
+ * Sink that emits its input elements to {@link FileSystem} files within
* buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
*
* <p>
@@ -124,9 +124,9 @@ import java.util.Iterator;
* </li>
* <li>
* The part files are written using an instance of {@link Writer}. By default, a
- * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- * of {@code toString()} for every element, separated by newlines. You can configure the writer using the
- * {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
+ * {@link StringWriter} is used, which writes the result of {@code toString()} for
+ * every element, separated by newlines. You can configure the writer using the
+ * {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
* can be used to write Hadoop {@code SequenceFiles}.
* </li>
* </ol>
http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
new file mode 100644
index 0000000..0c5e16b
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -0,0 +1,183 @@
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+@Deprecated
+public class RollingSinkMigrationTest {
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String PART_PREFIX = "part";
+ private static final String PENDING_SUFFIX = ".pending";
+ private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+ @Test
+ public void testMigration() throws Exception {
+
+ /*
+ * Code ran to get the snapshot:
+ *
+ * final File outDir = tempFolder.newFolder();
+
+ RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
+ .setWriter(new StringWriter<String>())
+ .setBatchSize(5)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink));
+
+ testHarness1.setup();
+ testHarness1.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+ checkFs(outDir, 1, 1, 0, 0);
+
+ testHarness1.processElement(new StreamRecord<>("test3", 0L));
+ testHarness1.processElement(new StreamRecord<>("test4", 0L));
+ testHarness1.processElement(new StreamRecord<>("test5", 0L));
+
+ checkFs(outDir, 1, 4, 0, 0);
+
+ StreamTaskState taskState = testHarness1.snapshot(0, 0);
+ testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
+ testHarness1.close();
+ * */
+
+ final File outDir = tempFolder.newFolder();
+
+ RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath())
+ .setWriter(new StringWriter<String>())
+ .setBatchSize(5)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(sink), 10, 1, 0);
+ testHarness1.setup();
+ testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+ testHarness1.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+ checkFs(outDir, 1, 1, 0, 0);
+
+ testHarness1.close();
+ }
+
+ private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+ int inProg = 0;
+ int pend = 0;
+ int compl = 0;
+ int val = 0;
+
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (file.getAbsolutePath().endsWith("crc")) {
+ continue;
+ }
+ String path = file.getPath();
+ if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+ inProg++;
+ } else if (path.endsWith(PENDING_SUFFIX)) {
+ pend++;
+ } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+ val++;
+ } else if (path.contains(PART_PREFIX)) {
+ compl++;
+ }
+ }
+
+ Assert.assertEquals(inprogress, inProg);
+ Assert.assertEquals(pending, pend);
+ Assert.assertEquals(completed, compl);
+ Assert.assertEquals(valid, val);
+ }
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ return resource.getFile();
+ }
+
+ static class ValidatingRollingSink<T> extends RollingSink<T> {
+
+ private static final long serialVersionUID = -4263974081712009141L;
+
+ ValidatingRollingSink(String basePath) {
+ super(basePath);
+ }
+
+ @Override
+ public void restoreState(BucketState state) throws Exception {
+
+ /**
+ * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+ * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+ * validLength=6
+ * pendingForNextCheckpoint=[]
+ * pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+ * */
+
+ String current = state.currentFile;
+ long validLength = state.currentFileValidLength;
+
+ Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+ Assert.assertEquals(6, validLength);
+
+ List<String> pendingFiles = state.pendingFiles;
+ Assert.assertTrue(pendingFiles.isEmpty());
+
+ final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+ Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+ for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+ long checkpoint = entry.getKey();
+ List<String> files = entry.getValue();
+
+ Assert.assertEquals(0L, checkpoint);
+ Assert.assertEquals(4, files.size());
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertEquals(
+ "/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+ files.get(i));
+ }
+ }
+ super.restoreState(state);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbca3293/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..2ebd70a
Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot differ