You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/16 20:10:07 UTC

[2/3] flink git commit: [FLINK-9750] [DataStream API] Add new StreamingFileSink on top of the ResumableWriter.

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
new file mode 100644
index 0000000..353ac00
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tests for the {@link BucketStateSerializer}.
+ */
+public class BucketStateSerializerTest {
+
+	private static final String IN_PROGRESS_CONTENT = "writing";
+	private static final String PENDING_CONTENT = "wrote";
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testSerializationEmpty() throws IOException {
+		final File testFolder = tempFolder.newFolder();
+		final FileSystem fs = FileSystem.get(testFolder.toURI());
+		final RecoverableWriter writer = fs.createRecoverableWriter();
+
+		final Path testBucket = new Path(testFolder.getPath(), "test");
+
+		final BucketState bucketState = new BucketState(
+				"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
+
+		final SimpleVersionedSerializer<BucketState> serializer =
+				new BucketStateSerializer(
+						writer.getResumeRecoverableSerializer(),
+						writer.getCommitRecoverableSerializer()
+				);
+
+		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+
+		Assert.assertEquals(testBucket, recoveredState.getBucketPath());
+		Assert.assertNull(recoveredState.getCurrentInProgress());
+		Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
+	}
+
+	@Test
+	public void testSerializationOnlyInProgress() throws IOException {
+		final File testFolder = tempFolder.newFolder();
+		final FileSystem fs = FileSystem.get(testFolder.toURI());
+
+		final Path testBucket = new Path(testFolder.getPath(), "test");
+
+		final RecoverableWriter writer = fs.createRecoverableWriter();
+		final RecoverableFsDataOutputStream stream = writer.open(testBucket);
+		stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
+
+		final RecoverableWriter.ResumeRecoverable current = stream.persist();
+
+		final BucketState bucketState = new BucketState(
+				"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
+
+		final SimpleVersionedSerializer<BucketState> serializer =
+				new BucketStateSerializer(
+						writer.getResumeRecoverableSerializer(),
+						writer.getCommitRecoverableSerializer()
+				);
+
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+
+		// to simulate that everything is over for file.
+		stream.close();
+
+		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+
+		Assert.assertEquals(testBucket, recoveredState.getBucketPath());
+
+		FileStatus[] statuses = fs.listStatus(testBucket.getParent());
+		Assert.assertEquals(1L, statuses.length);
+		Assert.assertTrue(
+				statuses[0].getPath().getPath().startsWith(
+						(new Path(testBucket.getParent(), ".test.inprogress")).toString())
+		);
+	}
+
+	@Test
+	public void testSerializationFull() throws IOException {
+		final int noOfTasks = 5;
+
+		final File testFolder = tempFolder.newFolder();
+		final FileSystem fs = FileSystem.get(testFolder.toURI());
+		final RecoverableWriter writer = fs.createRecoverableWriter();
+
+		final Path bucketPath = new Path(testFolder.getPath());
+
+		// pending for checkpoints
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> commitRecoverables = new HashMap<>();
+		for (int i = 0; i < noOfTasks; i++) {
+			final List<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<>();
+			for (int j = 0; j < 2 + i; j++) {
+				final Path part = new Path(bucketPath, "part-" + i + '-' + j);
+
+				final RecoverableFsDataOutputStream stream = writer.open(part);
+				stream.write((PENDING_CONTENT + '-' + j).getBytes(Charset.forName("UTF-8")));
+				recoverables.add(stream.closeForCommit().getRecoverable());
+			}
+			commitRecoverables.put((long) i, recoverables);
+		}
+
+		// in-progress
+		final Path testBucket = new Path(bucketPath, "test-2");
+		final RecoverableFsDataOutputStream stream = writer.open(testBucket);
+		stream.write(IN_PROGRESS_CONTENT.getBytes(Charset.forName("UTF-8")));
+
+		final RecoverableWriter.ResumeRecoverable current = stream.persist();
+
+		final BucketState bucketState = new BucketState(
+				"test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
+		final SimpleVersionedSerializer<BucketState> serializer =
+				new BucketStateSerializer(
+						writer.getResumeRecoverableSerializer(),
+						writer.getCommitRecoverableSerializer()
+				);
+		stream.close();
+
+		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+
+		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+
+		Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
+
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+		Assert.assertEquals(5L, recoveredRecoverables.size());
+
+		// recover and commit
+		for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry: recoveredRecoverables.entrySet()) {
+			for (RecoverableWriter.CommitRecoverable recoverable: entry.getValue()) {
+				writer.recoverForCommit(recoverable).commit();
+			}
+		}
+
+		FileStatus[] filestatuses = fs.listStatus(bucketPath);
+		Set<String> paths = new HashSet<>(filestatuses.length);
+		for (FileStatus filestatus : filestatuses) {
+			paths.add(filestatus.getPath().getPath());
+		}
+
+		for (int i = 0; i < noOfTasks; i++) {
+			for (int j = 0; j < 2 + i; j++) {
+				final String part = new Path(bucketPath, "part-" + i + '-' + j).toString();
+				Assert.assertTrue(paths.contains(part));
+				paths.remove(part);
+			}
+		}
+
+		// only the in-progress must be left
+		Assert.assertEquals(1L, paths.size());
+
+		// verify that the in-progress file is still there
+		Assert.assertTrue(paths.iterator().next().startsWith(
+				(new Path(testBucket.getParent(), ".test-2.inprogress").toString())));
+	}
+
+	@Test
+	public void testSerializationNullInProgress() throws IOException {
+		final int noOfTasks = 5;
+
+		final File testFolder = tempFolder.newFolder();
+		final FileSystem fs = FileSystem.get(testFolder.toURI());
+		final RecoverableWriter writer = fs.createRecoverableWriter();
+
+		final Path bucketPath = new Path(testFolder.getPath());
+
+		// pending for checkpoints
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> commitRecoverables = new HashMap<>();
+		for (int i = 0; i < noOfTasks; i++) {
+			final List<RecoverableWriter.CommitRecoverable> recoverables = new ArrayList<>();
+			for (int j = 0; j < 2 + i; j++) {
+				final Path part = new Path(bucketPath, "test-" + i + '-' + j);
+
+				final RecoverableFsDataOutputStream stream = writer.open(part);
+				stream.write((PENDING_CONTENT + '-' + j).getBytes(Charset.forName("UTF-8")));
+				recoverables.add(stream.closeForCommit().getRecoverable());
+			}
+			commitRecoverables.put((long) i, recoverables);
+		}
+
+		final RecoverableWriter.ResumeRecoverable current = null;
+
+		final BucketState bucketState = new BucketState(
+				"", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
+
+		final SimpleVersionedSerializer<BucketState> serializer = new BucketStateSerializer(
+				writer.getResumeRecoverableSerializer(),
+				writer.getCommitRecoverableSerializer()
+		);
+
+		byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
+
+		final BucketState recoveredState =  SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+
+		Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
+		Assert.assertNull(recoveredState.getCurrentInProgress());
+
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+		Assert.assertEquals(5L, recoveredRecoverables.size());
+
+		// recover and commit
+		for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry: recoveredRecoverables.entrySet()) {
+			for (RecoverableWriter.CommitRecoverable recoverable: entry.getValue()) {
+				writer.recoverForCommit(recoverable).commit();
+			}
+		}
+
+		FileStatus[] filestatuses = fs.listStatus(bucketPath);
+		Set<String> paths = new HashSet<>(filestatuses.length);
+		for (FileStatus filestatus : filestatuses) {
+			paths.add(filestatus.getPath().getPath());
+		}
+
+		for (int i = 0; i < noOfTasks; i++) {
+			for (int j = 0; j < 2 + i; j++) {
+				final String part = new Path(bucketPath, "test-" + i + '-' + j).toString();
+				Assert.assertTrue(paths.contains(part));
+				paths.remove(part);
+			}
+		}
+
+		// only the in-progress must be left
+		Assert.assertTrue(paths.isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
new file mode 100644
index 0000000..b6f73ac
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for the {@link StreamingFileSink}.
+ */
+public class LocalStreamingFileSinkTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testClosingWithoutInput() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+				createRescalingTestSink(outDir, 1, 0, 100L, 124L);
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testTruncateAfterRecoveryAndOverwrite() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		OperatorSubtaskState snapshot;
+
+		// we set the max bucket size to small so that we can know when it rolls
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+				outDir, 1, 0, 100L, 10L)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			// this creates a new bucket "test1" and part-0-0
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+			checkLocalFs(outDir, 1, 0);
+
+			// we take a checkpoint so that we keep the in-progress file offset.
+			snapshot = testHarness.snapshot(1L, 1L);
+
+			// these will close part-0-0 and open part-0-1
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
+
+			checkLocalFs(outDir, 2, 0);
+
+			Map<File, String> contents = getFileContentByPath(outDir);
+			int fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : contents.entrySet()) {
+				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@3\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(2L, fileCounter);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+				outDir, 1, 0, 100L, 10L)) {
+
+			testHarness.setup();
+			testHarness.initializeState(snapshot);
+			testHarness.open();
+
+			// the in-progress is the not cleaned up one and the pending is truncated and finalized
+			checkLocalFs(outDir, 2, 0);
+
+			// now we go back to the first checkpoint so it should truncate part-0-0 and restart part-0-1
+			int fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+					// truncated
+					fileCounter++;
+					Assert.assertEquals("test1@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					// ignored for now as we do not clean up. This will be overwritten.
+					fileCounter++;
+					Assert.assertEquals("test1@3\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(2L, fileCounter);
+
+			// the first closes part-0-0 and the second will open part-0-1
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
+
+			fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					// ignored for now as we do not clean up. This will be overwritten.
+					fileCounter++;
+					Assert.assertEquals("test1@3\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(2L, fileCounter);
+
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
+			checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension)
+
+			testHarness.snapshot(2L, 2L);
+
+			// this will close the new part-0-1
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
+			checkLocalFs(outDir, 3, 0);
+
+			fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					if (fileContents.getValue().equals("test1@5\ntest1@6\n") ||
+							fileContents.getValue().equals("test1@3\n")) {
+						fileCounter++;
+					}
+				}
+			}
+			Assert.assertEquals(3L, fileCounter);
+
+			// this will publish part-0-0
+			testHarness.notifyOfCompletedCheckpoint(2L);
+			checkLocalFs(outDir, 2, 1);
+
+			fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getName().equals("part-0-0")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					if (fileContents.getValue().equals("test1@5\ntest1@6\n") ||
+							fileContents.getValue().equals("test1@3\n")) {
+						fileCounter++;
+					}
+				}
+			}
+			Assert.assertEquals(3L, fileCounter);
+		}
+	}
+
+	@Test
+	public void testCommitStagedFilesInCorrectOrder() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		// we set the max bucket size to small so that we can know when it rolls
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+				outDir, 1, 0, 100L, 10L)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.setProcessingTime(0L);
+
+			// these 2 create a new bucket "test1", with a .part-0-0.inprogress and also fill it
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
+			checkLocalFs(outDir, 1, 0);
+
+			// this will open .part-0-1.inprogress
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
+			checkLocalFs(outDir, 2, 0);
+
+			// we take a checkpoint so that we keep the in-progress file offset.
+			testHarness.snapshot(1L, 1L);
+
+			// this will close .part-0-1.inprogress
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
+
+			// and open and fill .part-0-2.inprogress
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
+			checkLocalFs(outDir, 3, 0);                    // nothing committed yet
+
+			testHarness.snapshot(2L, 2L);
+
+			// open .part-0-3.inprogress
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 7), 7L));
+			checkLocalFs(outDir, 4, 0);
+
+			// this will close the part file (time)
+			testHarness.setProcessingTime(101L);
+
+			testHarness.snapshot(3L, 3L);
+
+			testHarness.notifyOfCompletedCheckpoint(1L);							// the pending for checkpoint 1 are committed
+			checkLocalFs(outDir, 3, 1);
+
+			int fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getName().equals("part-0-0")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@3\ntest1@4\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-2.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@5\ntest1@6\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().contains(".part-0-3.inprogress")) {
+					fileCounter++;
+					Assert.assertEquals("test1@7\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(4L, fileCounter);
+
+			testHarness.notifyOfCompletedCheckpoint(3L);							// all the pending for checkpoint 2 and 3 are committed
+			checkLocalFs(outDir, 0, 4);
+
+			fileCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getName().equals("part-0-0")) {
+					fileCounter++;
+					Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().equals("part-0-1")) {
+					fileCounter++;
+					Assert.assertEquals("test1@3\ntest1@4\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().equals("part-0-2")) {
+					fileCounter++;
+					Assert.assertEquals("test1@5\ntest1@6\n", fileContents.getValue());
+				} else if (fileContents.getKey().getName().equals("part-0-3")) {
+					fileCounter++;
+					Assert.assertEquals("test1@7\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(4L, fileCounter);
+		}
+	}
+
+	@Test
+	public void testInactivityPeriodWithLateNotify() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		// we set a big bucket size so that it does not close by size, but by timers.
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+				outDir, 1, 0, 100L, 124L)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.setProcessingTime(0L);
+
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
+			checkLocalFs(outDir, 2, 0);
+
+			int bucketCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getParentFile().getName().equals("test1")) {
+					bucketCounter++;
+				} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
+					bucketCounter++;
+				}
+			}
+			Assert.assertEquals(2L, bucketCounter);					// verifies that we have 2 buckets, "test1" and "test2"
+
+			testHarness.setProcessingTime(101L);                                // put them in pending
+			checkLocalFs(outDir, 2, 0);
+
+			testHarness.snapshot(0L, 0L);                // put them in pending for 0
+			checkLocalFs(outDir, 2, 0);
+
+			// create another 2 buckets with 1 inprogress file each
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test4", 1), 1L));
+
+			testHarness.setProcessingTime(202L);                                // put them in pending
+
+			testHarness.snapshot(1L, 0L);                // put them in pending for 1
+			checkLocalFs(outDir, 4, 0);
+
+			testHarness.notifyOfCompletedCheckpoint(0L);            // put the pending for 0 to the "committed" state
+			checkLocalFs(outDir, 2, 2);
+
+			bucketCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getParentFile().getName().equals("test1")) {
+					bucketCounter++;
+					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("test1@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
+					bucketCounter++;
+					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("test2@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
+					bucketCounter++;
+				} else if (fileContents.getKey().getParentFile().getName().equals("test4")) {
+					bucketCounter++;
+				}
+			}
+			Assert.assertEquals(4L, bucketCounter);
+
+			testHarness.notifyOfCompletedCheckpoint(1L);            // put the pending for 1 to the "committed" state
+			checkLocalFs(outDir, 0, 4);
+
+			bucketCounter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				if (fileContents.getKey().getParentFile().getName().equals("test1")) {
+					bucketCounter++;
+					Assert.assertEquals("test1@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
+					bucketCounter++;
+					Assert.assertEquals("test2@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
+					bucketCounter++;
+					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("test3@1\n", fileContents.getValue());
+				} else if (fileContents.getKey().getParentFile().getName().equals("test4")) {
+					bucketCounter++;
+					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("test4@1\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(4L, bucketCounter);
+		}
+	}
+
+	@Test
+	public void testClosingOnSnapshot() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+					createRescalingTestSink(outDir, 1, 0, 100L, 2L)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.setProcessingTime(0L);
+
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
+			checkLocalFs(outDir, 2, 0);
+
+			// this is to check the inactivity threshold
+			testHarness.setProcessingTime(101L);
+			checkLocalFs(outDir, 2, 0);
+
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L));
+			checkLocalFs(outDir, 3, 0);
+
+			testHarness.snapshot(0L, 1L);
+			checkLocalFs(outDir, 3, 0);
+
+			testHarness.notifyOfCompletedCheckpoint(0L);
+			checkLocalFs(outDir, 0, 3);
+
+			testHarness.snapshot(1L, 0L);
+
+			testHarness.processElement(new StreamRecord<>(Tuple2.of("test4", 10), 10L));
+			checkLocalFs(outDir, 1, 3);
+		}
+
+		// at close it is not moved to final.
+		checkLocalFs(outDir, 1, 3);
+	}
+
+	@Test
+	public void testScalingDownAndMergingOfStates() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		OperatorSubtaskState mergedSnapshot;
+
+		// we set small file size so that the part file rolls.
+		try (
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createRescalingTestSink(
+						outDir, 2, 0, 100L, 10L);
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createRescalingTestSink(
+						outDir, 2, 1, 100L, 10L)
+		) {
+			testHarness1.setup();
+			testHarness1.open();
+
+			testHarness2.setup();
+			testHarness2.open();
+
+			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
+			checkLocalFs(outDir, 1, 0);
+
+			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
+
+			// all the files are in-progress
+			checkLocalFs(outDir, 3, 0);
+
+			int counter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				final String parentFilename = fileContents.getKey().getParentFile().getName();
+				final String inProgressFilename = fileContents.getKey().getName();
+
+				if (parentFilename.equals("test1") &&
+						(
+							inProgressFilename.contains(".part-0-0.inprogress") ||
+							inProgressFilename.contains(".part-1-0.inprogress")
+						)
+				) {
+						counter++;
+				} else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-0.inprogress")) {
+					counter++;
+				}
+			}
+			Assert.assertEquals(3L, counter);
+
+			// intentionally we snapshot them in the reverse order so that the states are shuffled
+			mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+					testHarness1.snapshot(1L, 0L),
+					testHarness2.snapshot(1L, 0L)
+			);
+		}
+
+		try (
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+						outDir, 1, 0, 100L, 10L)
+		) {
+			testHarness.setup();
+			testHarness.initializeState(mergedSnapshot);
+			testHarness.open();
+
+			// still everything in-progress but the in-progress for prev task 1 should be put in pending now
+			checkLocalFs(outDir, 3, 0);
+
+			testHarness.snapshot(2L, 2L);
+			testHarness.notifyOfCompletedCheckpoint(2L);
+
+			int counter = 0;
+			for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+				final String parentFilename = fileContents.getKey().getParentFile().getName();
+				final String filename = fileContents.getKey().getName();
+
+				if (parentFilename.equals("test1")) {
+					// the following is because it depends on the order in which the states are consumed in the initialize state.
+					if (filename.contains("-0.inprogress") || filename.endsWith("-0")) {
+						counter++;
+						Assert.assertTrue(fileContents.getValue().equals("test1@1\n") || fileContents.getValue().equals("test1@0\n"));
+					}
+				} else if (parentFilename.equals("test2") && filename.contains(".part-1-0.inprogress")) {
+					counter++;
+					Assert.assertEquals("test2@1\n", fileContents.getValue());
+				}
+			}
+			Assert.assertEquals(3L, counter);
+		}
+	}
+
+	@Test
+	public void testMaxCounterUponRecovery() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+
+		OperatorSubtaskState mergedSnapshot;
+
+		final TestBucketFactory first = new TestBucketFactory();
+		final TestBucketFactory second = new TestBucketFactory();
+
+		try (
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink(
+						outDir, 2, 0, 100L, 2L, first, new SimpleStringEncoder<>());
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink(
+						outDir, 2, 1, 100L, 2L, second, new SimpleStringEncoder<>())
+		) {
+			testHarness1.setup();
+			testHarness1.open();
+
+			testHarness2.setup();
+			testHarness2.open();
+
+			// we only put elements in one task.
+			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
+			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
+			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
+			checkLocalFs(outDir, 3, 0);
+
+			// intentionally we snapshot them in the reverse order so that the states are shuffled
+			mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
+					testHarness2.snapshot(0L, 0L),
+					testHarness1.snapshot(0L, 0L)
+			);
+		}
+
+		final TestBucketFactory firstRecovered = new TestBucketFactory();
+		final TestBucketFactory secondRecovered = new TestBucketFactory();
+
+		try (
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink(
+						outDir, 2, 0, 100L, 2L, firstRecovered, new SimpleStringEncoder<>());
+				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink(
+						outDir, 2, 1, 100L, 2L, secondRecovered, new SimpleStringEncoder<>())
+		) {
+			testHarness1.setup();
+			testHarness1.initializeState(mergedSnapshot);
+			testHarness1.open();
+
+			// we have to send an element so that the factory updates its counter.
+			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test4", 0), 0L));
+
+			Assert.assertEquals(3L, firstRecovered.getInitialCounter());
+			checkLocalFs(outDir, 1, 3);
+
+			testHarness2.setup();
+			testHarness2.initializeState(mergedSnapshot);
+			testHarness2.open();
+
+			// we have to send an element so that the factory updates its counter.
+			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 0), 0L));
+
+			Assert.assertEquals(3L, secondRecovered.getInitialCounter());
+			checkLocalFs(outDir, 2, 3);
+		}
+	}
+
+	//////////////////////			Helper Methods			//////////////////////
+
+	private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createRescalingTestSink(
+			File outDir,
+			int totalParallelism,
+			int taskIdx,
+			long inactivityInterval,
+			long partMaxSize) throws Exception {
+
+		return createCustomRescalingTestSink(
+				outDir,
+				totalParallelism,
+				taskIdx,
+				inactivityInterval,
+				partMaxSize,
+				new DefaultBucketFactory<>(),
+				(Encoder<Tuple2<String, Integer>>) (element, stream) -> {
+					stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8));
+					stream.write('\n');
+				});
+	}
+
+	private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink(
+			File outDir,
+			int totalParallelism,
+			int taskIdx,
+			long inactivityInterval,
+			long partMaxSize,
+			BucketFactory<Tuple2<String, Integer>> factory,
+			Encoder<Tuple2<String, Integer>> writer) throws Exception {
+
+		StreamingFileSink<Tuple2<String, Integer>> sink = new StreamingFileSink<>(new Path(outDir.toURI()), factory)
+				.setBucketer(new Bucketer<Tuple2<String, Integer>>() {
+
+					private static final long serialVersionUID = -3086487303018372007L;
+
+					@Override
+					public String getBucketId(Tuple2<String, Integer> element, Context context) {
+						return element.f0;
+					}
+				})
+				.setEncoder(writer)
+				.setRollingPolicy(
+						DefaultRollingPolicy
+								.create()
+								.withMaxPartSize(partMaxSize)
+								.withRolloverInterval(inactivityInterval)
+								.withInactivityInterval(inactivityInterval)
+								.build())
+				.setBucketCheckInterval(10L);
+
+		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
+	}
+
+	static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>> {
+
+		private static final long serialVersionUID = 2794824980604027930L;
+
+		private long initialCounter = -1L;
+
+		@Override
+		public Bucket<Tuple2<String, Integer>> getNewBucket(
+				RecoverableWriter fsWriter,
+				int subtaskIndex,
+				String bucketId,
+				Path bucketPath,
+				long initialPartCounter,
+				Encoder<Tuple2<String, Integer>> writer) throws IOException {
+
+			this.initialCounter = initialPartCounter;
+
+			return super.getNewBucket(
+					fsWriter,
+					subtaskIndex,
+					bucketId,
+					bucketPath,
+					initialPartCounter,
+					writer);
+		}
+
+		@Override
+		public Bucket<Tuple2<String, Integer>> restoreBucket(
+				RecoverableWriter fsWriter,
+				int subtaskIndex,
+				long initialPartCounter,
+				Encoder<Tuple2<String, Integer>> writer,
+				BucketState bucketState) throws IOException {
+
+			this.initialCounter = initialPartCounter;
+
+			return super.restoreBucket(
+					fsWriter,
+					subtaskIndex,
+					initialPartCounter,
+					writer,
+					bucketState);
+		}
+
+		public long getInitialCounter() {
+			return initialCounter;
+		}
+	}
+
+	private static void checkLocalFs(File outDir, int expectedInProgress, int expectedCompleted) {
+		int inProgress = 0;
+		int finished = 0;
+
+		for (File file: FileUtils.listFiles(outDir, null, true)) {
+			if (file.getAbsolutePath().endsWith("crc")) {
+				continue;
+			}
+
+			if (file.toPath().getFileName().toString().startsWith(".")) {
+				inProgress++;
+			} else {
+				finished++;
+			}
+		}
+
+		Assert.assertEquals(expectedInProgress, inProgress);
+		Assert.assertEquals(expectedCompleted, finished);
+	}
+
+	private static Map<File, String> getFileContentByPath(File directory) throws IOException {
+		Map<File, String> contents = new HashMap<>(4);
+
+		final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true);
+		for (File file : filesInBucket) {
+			contents.put(file, FileUtils.readFileToString(file));
+		}
+		return contents;
+	}
+}