You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/13 10:45:51 UTC
[3/4] flink git commit: [FLINK-5443] Migrate from Rolling to
Bucketing sink.
[FLINK-5443] Migrate from Rolling to Bucketing sink.
To migrate from a RollingSink to a BucketingSink, a
user can now take a savepoint, change his code to
use a BuckeingSink with the same properties as the
previous RollingSink, and resume his program from
that savepoint.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4c60a94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4c60a94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4c60a94
Branch: refs/heads/release-1.2
Commit: b4c60a942fe07e355dd49ed2aab3c0a7ae94285d
Parents: 078e248
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 6 16:52:51 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Jan 13 11:38:44 2017 +0100
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 10 +-
.../connectors/fs/bucketing/BucketingSink.java | 278 +++++++++++--------
.../fs/bucketing/RollingSinkMigrationTest.java | 17 ++
.../RollingToBucketingMigrationTest.java | 161 +++++++++++
4 files changed, 348 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 98eb2d4..429d00a 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -905,30 +905,30 @@ public class RollingSink<T> extends RichSinkFunction<T>
* This is used for keeping track of the current in-progress files and files that we mark
* for moving from pending to final location after we get a checkpoint-complete notification.
*/
- static final class BucketState implements Serializable {
+ public static final class BucketState implements Serializable {
private static final long serialVersionUID = 1L;
/**
* The file that was in-progress when the last checkpoint occurred.
*/
- String currentFile;
+ public String currentFile;
/**
* The valid length of the in-progress file at the time of the last checkpoint.
*/
- long currentFileValidLength = -1;
+ public long currentFileValidLength = -1;
/**
* Pending files that accumulated since the last checkpoint.
*/
- List<String> pendingFiles = new ArrayList<>();
+ public List<String> pendingFiles = new ArrayList<>();
/**
* When doing a checkpoint we move the pending files since the last checkpoint to this map
* with the id of the checkpoint. When we get the checkpoint-complete notification we move
* pending files of completed checkpoints to their final location.
*/
- final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
+ public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index e8bff21..7dbcda7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.Writer;
@@ -149,7 +151,8 @@ import java.util.Iterator;
*/
public class BucketingSink<T>
extends RichSinkFunction<T>
- implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
+ implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener,
+ CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
@@ -344,7 +347,12 @@ public class BucketingSink<T>
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
- initFileSystem();
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
+ }
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
@@ -706,139 +714,183 @@ public class BucketingSink<T>
// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();
- if (bucketState.currentFile != null) {
+ handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileValidLength);
- // We were writing to a file when the last checkpoint occurred. This file can either
- // be still in-progress or became a pending file at some point after the checkpoint.
- // Either way, we have to truncate it back to a valid state (or write a .valid-length
- // file that specifies up to which length it is valid) and rename it to the final name
- // before starting a new bucket file.
+ // Now that we've restored the bucket to a valid state, reset the current file info
+ bucketState.currentFile = null;
+ bucketState.currentFileValidLength = -1;
+ bucketState.isWriterOpen = false;
- Path partPath = new Path(bucketState.currentFile);
- try {
- Path partPendingPath = getPendingPathFor(partPath);
- Path partInProgressPath = getInProgressPathFor(partPath);
-
- if (fs.exists(partPendingPath)) {
- LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
- // has been moved to pending in the mean time, rename to final location
- fs.rename(partPendingPath, partPath);
- } else if (fs.exists(partInProgressPath)) {
- LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
- // it was still in progress, rename to final path
- fs.rename(partInProgressPath, partPath);
- } else if (fs.exists(partPath)) {
- LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
- } else {
- LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
- "it was moved to final location by a previous snapshot restore", bucketState.currentFile);
- }
+ handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
- // We use reflection to get the .truncate() method, this
- // is only available starting with Hadoop 2.7
- if (this.refTruncate == null) {
- this.refTruncate = reflectTruncate(fs);
- }
+ synchronized (bucketState.pendingFilesPerCheckpoint) {
+ bucketState.pendingFilesPerCheckpoint.clear();
+ }
+ }
+ }
+
+ private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
+ restoredState.pendingFiles.clear();
- // truncate it or write a ".valid-length" file to specify up to which point it is valid
- if (refTruncate != null) {
- LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
- // some-one else might still hold the lease from a previous try, we are
- // recovering, after all ...
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
- LOG.debug("Trying to recover file lease {}", partPath);
- dfs.recoverLease(partPath);
- boolean isclosed = dfs.isFileClosed(partPath);
- StopWatch sw = new StopWatch();
- sw.start();
- while (!isclosed) {
- if (sw.getTime() > asyncTimeout) {
- break;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // ignore it
- }
- isclosed = dfs.isFileClosed(partPath);
+ handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
+
+ // Now that we've restored the bucket to a valid state, reset the current file info
+ restoredState.currentFile = null;
+ restoredState.currentFileValidLength = -1;
+
+ handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
+
+ synchronized (restoredState.pendingFilesPerCheckpoint) {
+ restoredState.pendingFilesPerCheckpoint.clear();
+ }
+ }
+
+ private void handlePendingInProgressFile(String file, long validLength) {
+ if (file != null) {
+
+ // We were writing to a file when the last checkpoint occurred. This file can either
+ // be still in-progress or became a pending file at some point after the checkpoint.
+ // Either way, we have to truncate it back to a valid state (or write a .valid-length
+ // file that specifies up to which length it is valid) and rename it to the final name
+ // before starting a new bucket file.
+
+ Path partPath = new Path(file);
+ try {
+ Path partPendingPath = getPendingPathFor(partPath);
+ Path partInProgressPath = getInProgressPathFor(partPath);
+
+ if (fs.exists(partPendingPath)) {
+ LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
+ // has been moved to pending in the mean time, rename to final location
+ fs.rename(partPendingPath, partPath);
+ } else if (fs.exists(partInProgressPath)) {
+ LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
+ // it was still in progress, rename to final path
+ fs.rename(partInProgressPath, partPath);
+ } else if (fs.exists(partPath)) {
+ LOG.debug("In-Progress file {} was already moved to final location {}.", file, partPath);
+ } else {
+ LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
+ "it was moved to final location by a previous snapshot restore", file);
+ }
+
+ // We use reflection to get the .truncate() method, this
+ // is only available starting with Hadoop 2.7
+ if (this.refTruncate == null) {
+ this.refTruncate = reflectTruncate(fs);
+ }
+
+ // truncate it or write a ".valid-length" file to specify up to which point it is valid
+ if (refTruncate != null) {
+ LOG.debug("Truncating {} to valid length {}", partPath, validLength);
+ // some-one else might still hold the lease from a previous try, we are
+ // recovering, after all ...
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ LOG.debug("Trying to recover file lease {}", partPath);
+ dfs.recoverLease(partPath);
+ boolean isclosed = dfs.isFileClosed(partPath);
+ StopWatch sw = new StopWatch();
+ sw.start();
+ while (!isclosed) {
+ if (sw.getTime() > asyncTimeout) {
+ break;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // ignore it
}
+ isclosed = dfs.isFileClosed(partPath);
}
- Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
- if (!truncated) {
- LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
- // we must wait for the asynchronous truncate operation to complete
- StopWatch sw = new StopWatch();
- sw.start();
- long newLen = fs.getFileStatus(partPath).getLen();
- while (newLen != bucketState.currentFileValidLength) {
- if (sw.getTime() > asyncTimeout) {
- break;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // ignore it
- }
- newLen = fs.getFileStatus(partPath).getLen();
+ }
+ Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, validLength);
+ if (!truncated) {
+ LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
+
+ // we must wait for the asynchronous truncate operation to complete
+ StopWatch sw = new StopWatch();
+ sw.start();
+ long newLen = fs.getFileStatus(partPath).getLen();
+ while (newLen != validLength) {
+ if (sw.getTime() > asyncTimeout) {
+ break;
}
- if (newLen != bucketState.currentFileValidLength) {
- throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // ignore it
}
+ newLen = fs.getFileStatus(partPath).getLen();
}
- } else {
- LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
- Path validLengthFilePath = getValidLengthPathFor(partPath);
- if (!fs.exists(validLengthFilePath)) {
- FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
- lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
- lengthFileOut.close();
+ if (newLen != validLength) {
+ throw new RuntimeException("Truncate did not truncate to right length. Should be " + validLength + " is " + newLen + ".");
}
}
-
- // Now that we've restored the bucket to a valid state, reset the current file info
- bucketState.currentFile = null;
- bucketState.currentFileValidLength = -1;
- bucketState.isWriterOpen = false;
- } catch (IOException e) {
- LOG.error("Error while restoring BucketingSink state.", e);
- throw new RuntimeException("Error while restoring BucketingSink state.", e);
- } catch (InvocationTargetException | IllegalAccessException e) {
- LOG.error("Could not invoke truncate.", e);
- throw new RuntimeException("Could not invoke truncate.", e);
+ } else {
+ LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
+ Path validLengthFilePath = getValidLengthPathFor(partPath);
+ if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
+ FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
+ lengthFileOut.writeUTF(Long.toString(validLength));
+ lengthFileOut.close();
+ }
}
+
+ } catch (IOException e) {
+ LOG.error("Error while restoring BucketingSink state.", e);
+ throw new RuntimeException("Error while restoring BucketingSink state.", e);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ LOG.error("Could not invoke truncate.", e);
+ throw new RuntimeException("Could not invoke truncate.", e);
}
+ }
+ }
- // Move files that are confirmed by a checkpoint but did not get moved to final location
- // because the checkpoint notification did not happen before a failure
+ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pendingFilesPerCheckpoint) {
+ // Move files that are confirmed by a checkpoint but did not get moved to final location
+ // because the checkpoint notification did not happen before a failure
- LOG.debug("Moving pending files to final location on restore.");
+ LOG.debug("Moving pending files to final location on restore.");
- Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
- for (Long pastCheckpointId : pastCheckpointIds) {
- // All the pending files are buckets that have been completed but are waiting to be renamed
- // to their final name
- for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
- Path finalPath = new Path(filename);
- Path pendingPath = getPendingPathFor(finalPath);
+ Set<Long> pastCheckpointIds = pendingFilesPerCheckpoint.keySet();
+ for (Long pastCheckpointId : pastCheckpointIds) {
+ // All the pending files are buckets that have been completed but are waiting to be renamed
+ // to their final name
+ for (String filename : pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+ Path finalPath = new Path(filename);
+ Path pendingPath = getPendingPathFor(finalPath);
- try {
- if (fs.exists(pendingPath)) {
- LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
- fs.rename(pendingPath, finalPath);
- }
- } catch (IOException e) {
- LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
- throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
+ try {
+ if (fs.exists(pendingPath)) {
+ LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
+ fs.rename(pendingPath, finalPath);
}
+ } catch (IOException e) {
+ LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
+ throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
}
}
+ }
+ }
- synchronized (bucketState.pendingFilesPerCheckpoint) {
- bucketState.pendingFilesPerCheckpoint.clear();
- }
+ // --------------------------------------------------------------------------------------------
+ // Backwards compatibility with Flink 1.1
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void restoreState(RollingSink.BucketState state) throws Exception {
+ LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);
+
+ try {
+ initFileSystem();
+ } catch (IOException e) {
+ LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
+ throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
}
+
+ handleRestoredRollingSinkState(state);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
index 0c5e16b..3355fae 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.commons.io.FileUtils;
http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
new file mode 100644
index 0000000..257b157
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public class RollingToBucketingMigrationTest {
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String PART_PREFIX = "part";
+ private static final String PENDING_SUFFIX = ".pending";
+ private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+ private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+ @Test
+ public void testMigration() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ BucketingSink<String> sink = new ValidatingBucketingSink<String>(outDir.getAbsolutePath())
+ .setWriter(new StringWriter<String>())
+ .setBatchSize(5)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(sink), 10, 1, 0);
+ testHarness1.setup();
+ testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+ testHarness1.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+ checkFs(outDir, 1, 1, 0, 0);
+
+ testHarness1.close();
+ }
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = RollingToBucketingMigrationTest.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ return resource.getFile();
+ }
+
+ private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException {
+ int inProg = 0;
+ int pend = 0;
+ int compl = 0;
+ int val = 0;
+
+ for (File file: FileUtils.listFiles(outDir, null, true)) {
+ if (file.getAbsolutePath().endsWith("crc")) {
+ continue;
+ }
+ String path = file.getPath();
+ if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+ inProg++;
+ } else if (path.endsWith(PENDING_SUFFIX)) {
+ pend++;
+ } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+ val++;
+ } else if (path.contains(PART_PREFIX)) {
+ compl++;
+ }
+ }
+
+ Assert.assertEquals(inprogress, inProg);
+ Assert.assertEquals(pending, pend);
+ Assert.assertEquals(completed, compl);
+ Assert.assertEquals(valid, val);
+ }
+
+ static class ValidatingBucketingSink<T> extends BucketingSink<T> {
+
+ private static final long serialVersionUID = -4263974081712009141L;
+
+ ValidatingBucketingSink(String basePath) {
+ super(basePath);
+ }
+
+ @Override
+ public void restoreState(RollingSink.BucketState state) throws Exception {
+
+ /**
+ * this validates that we read the state that was checkpointed by the previous version. We expect it to be:
+ * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+ * validLength=6
+ * pendingForNextCheckpoint=[]
+ * pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+ * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+ * */
+
+ String current = state.currentFile;
+ long validLength = state.currentFileValidLength;
+
+ Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current);
+ Assert.assertEquals(6, validLength);
+
+ List<String> pendingFiles = state.pendingFiles;
+ Assert.assertTrue(pendingFiles.isEmpty());
+
+ final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint;
+ Assert.assertEquals(1, pendingFilesPerCheckpoint.size());
+
+ for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) {
+ long checkpoint = entry.getKey();
+ List<String> files = entry.getValue();
+
+ Assert.assertEquals(0L, checkpoint);
+ Assert.assertEquals(4, files.size());
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertEquals(
+ "/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i,
+ files.get(i));
+ }
+ }
+
+ super.restoreState(state);
+ }
+ }
+}