You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/01 08:04:00 UTC

[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage

    [ https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671233#comment-16671233 ] 

ASF GitHub Bot commented on FLINK-10097:
----------------------------------------

kl0u closed pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 6187e6853dd..65a7628578c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
@@ -26,6 +27,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -59,10 +62,11 @@
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint = new HashMap<>();
+	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint;
 
 	private long partCounter;
 
+	@Nullable
 	private PartFileWriter<IN, BucketID> inProgressPart;
 
 	private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint;
@@ -88,6 +92,7 @@ private Bucket(
 		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 
 		this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
+		this.pendingPartsPerCheckpoint = new HashMap<>();
 	}
 
 	/**
@@ -277,6 +282,24 @@ void onProcessingTime(long timestamp) throws IOException {
 		}
 	}
 
+	// --------------------------- Testing Methods -----------------------------
+
+	@VisibleForTesting
+	Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPartsPerCheckpoint() {
+		return pendingPartsPerCheckpoint;
+	}
+
+	@Nullable
+	@VisibleForTesting
+	PartFileWriter<IN, BucketID> getInProgressPart() {
+		return inProgressPart;
+	}
+
+	@VisibleForTesting
+	List<RecoverableWriter.CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
+		return pendingPartsForCurrentCheckpoint;
+	}
+
 	// --------------------------- Static Factory Methods -----------------------------
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 2aca841f16d..d08bc2ac0c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -186,6 +187,10 @@ private void handleRestoredBucketState(final BucketState<BucketID> recoveredStat
 	}
 
 	private void updateActiveBucketId(final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
+		if (!restoredBucket.isActive()) {
+			return;
+		}
+
 		final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
 		if (bucket != null) {
 			bucket.merge(restoredBucket);
@@ -224,6 +229,9 @@ void snapshotState(
 		LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
 				subtaskIndex, checkpointId, maxPartCounter);
 
+		bucketStatesContainer.clear();
+		partCounterStateContainer.clear();
+
 		snapshotActiveBuckets(checkpointId, bucketStatesContainer);
 		partCounterStateContainer.add(maxPartCounter);
 	}
@@ -341,4 +349,16 @@ public Long timestamp() {
 			return elementTimestamp;
 		}
 	}
+
+	// --------------------------- Testing Methods -----------------------------
+
+	@VisibleForTesting
+	public long getMaxPartCounter() {
+		return maxPartCounter;
+	}
+
+	@VisibleForTesting
+	Map<BucketID, Bucket<IN, BucketID>> getActiveBuckets() {
+		return activeBuckets;
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 6f57fee81f3..dc0b1c6e8a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -344,9 +344,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
 	public void snapshotState(FunctionSnapshotContext context) throws Exception {
 		Preconditions.checkState(bucketStates != null && maxPartCountersState != null, "sink has not been initialized");
 
-		bucketStates.clear();
-		maxPartCountersState.clear();
-
 		buckets.snapshotState(
 				context.getCheckpointId(),
 				bucketStates,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index 7c75f1c5e25..f890326d147 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -78,8 +78,8 @@ public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN elemen
 
 	@Override
 	public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) {
-		return currentTime - partFileState.getCreationTime() > rolloverInterval ||
-				currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
+		return currentTime - partFileState.getCreationTime() >= rolloverInterval ||
+				currentTime - partFileState.getLastUpdateTime() >= inactivityInterval;
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 25622d14466..aee362178a7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -19,17 +19,26 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests for {@link Buckets}.
@@ -40,46 +49,287 @@
 	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
 	@Test
-	public void testContextPassingNormalExecution() throws Exception {
-		testCorrectPassingOfContext(1L, 2L, 3L);
-	}
+	public void testSnapshotAndRestore() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
 
-	@Test
-	public void testContextPassingNullTimestamp() throws Exception {
-		testCorrectPassingOfContext(null, 2L, 3L);
-	}
+		final RollingPolicy<String, String> onCheckpointRollingPolicy = OnCheckpointRollingPolicy.build();
 
-	private void testCorrectPassingOfContext(Long timestamp, long watermark, long processingTime) throws Exception {
-		final File outDir = TEMP_FOLDER.newFolder();
+		final Buckets<String, String> buckets = createBuckets(path, onCheckpointRollingPolicy, 0);
+
+		final ListState<byte[]> bucketStateContainer = new MockListState<>();
+		final ListState<Long> partCounterContainer = new MockListState<>();
+
+		buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+		buckets.snapshotState(0L, bucketStateContainer, partCounterContainer);
+
+		assertThat(buckets.getActiveBuckets().get("test1"), hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test1"));
+
+		buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 2L));
+		buckets.snapshotState(1L, bucketStateContainer, partCounterContainer);
+
+		assertThat(buckets.getActiveBuckets().get("test1"), hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test1"));
+		assertThat(buckets.getActiveBuckets().get("test2"), hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test2"));
+
+		Buckets<String, String> restoredBuckets =
+				restoreBuckets(path, onCheckpointRollingPolicy, 0, bucketStateContainer, partCounterContainer);
 
-		final Long expectedTimestamp = timestamp;
-		final long expectedWatermark = watermark;
-		final long expectedProcessingTime = processingTime;
+		final Map<String, Bucket<String, String>> activeBuckets = restoredBuckets.getActiveBuckets();
 
-		final Buckets<String, String> buckets = StreamingFileSink
-				.<String>forRowFormat(new Path(outDir.toURI()), new SimpleStringEncoder<>())
-				.withBucketAssigner(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
-				.createBuckets(2);
+		// because we commit pending files for previous checkpoints upon recovery
+		Assert.assertTrue(activeBuckets.isEmpty());
+	}
 
-		buckets.onElement("TEST", new SinkFunction.Context() {
+	private static TypeSafeMatcher<Bucket<String, String>> hasSinglePartFileToBeCommittedOnCheckpointAck(final Path testTmpPath, final String bucketId) {
+		return new TypeSafeMatcher<Bucket<String, String>>() {
 			@Override
-			public long currentProcessingTime() {
-				return expectedProcessingTime;
+			protected boolean matchesSafely(Bucket<String, String> bucket) {
+				return bucket.getBucketId().equals(bucketId) &&
+						bucket.getBucketPath().equals(new Path(testTmpPath, bucketId)) &&
+						bucket.getInProgressPart() == null &&
+						bucket.getPendingPartsForCurrentCheckpoint().isEmpty() &&
+						bucket.getPendingPartsPerCheckpoint().size() == 1;
 			}
 
 			@Override
-			public long currentWatermark() {
-				return expectedWatermark;
+			public void describeTo(Description description) {
+				description.appendText("a Bucket with a single pending part file @ ")
+						.appendValue(new Path(testTmpPath, bucketId))
+						.appendText("'");
 			}
+		};
+	}
 
-			@Override
-			public Long timestamp() {
-				return expectedTimestamp;
+	@Test
+	public void testMergeAtScaleInAndMaxCounterAtRecovery() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final RollingPolicy<String, String> onCheckpointRP =
+				DefaultRollingPolicy
+						.create()
+						.withMaxPartSize(7L) // roll with 2 elements
+						.build();
+
+		final MockListState<byte[]> bucketStateContainerOne = new MockListState<>();
+		final MockListState<byte[]> bucketStateContainerTwo = new MockListState<>();
+
+		final MockListState<Long> partCounterContainerOne = new MockListState<>();
+		final MockListState<Long> partCounterContainerTwo = new MockListState<>();
+
+		final Buckets<String, String> bucketsOne = createBuckets(path, onCheckpointRP, 0);
+		final Buckets<String, String> bucketsTwo = createBuckets(path, onCheckpointRP, 1);
+
+		bucketsOne.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+		bucketsOne.snapshotState(0L, bucketStateContainerOne, partCounterContainerOne);
+
+		Assert.assertEquals(1L, bucketsOne.getMaxPartCounter());
+
+		// make sure we have one in-progress file here
+		Assert.assertNotNull(bucketsOne.getActiveBuckets().get("test1").getInProgressPart());
+
+		// add a couple of in-progress files so that the part counter increases.
+		bucketsTwo.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+		bucketsTwo.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+
+		bucketsTwo.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+
+		bucketsTwo.snapshotState(0L, bucketStateContainerTwo, partCounterContainerTwo);
+
+		Assert.assertEquals(2L, bucketsTwo.getMaxPartCounter());
+
+		// make sure we have one in-progress file here and a pending
+		Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingPartsPerCheckpoint().size());
+		Assert.assertNotNull(bucketsTwo.getActiveBuckets().get("test1").getInProgressPart());
+
+		final ListState<byte[]> mergedBucketStateContainer = new MockListState<>();
+		final ListState<Long> mergedPartCounterContainer = new MockListState<>();
+
+		mergedBucketStateContainer.addAll(bucketStateContainerOne.getBackingList());
+		mergedBucketStateContainer.addAll(bucketStateContainerTwo.getBackingList());
+
+		mergedPartCounterContainer.addAll(partCounterContainerOne.getBackingList());
+		mergedPartCounterContainer.addAll(partCounterContainerTwo.getBackingList());
+
+		final Buckets<String, String> restoredBuckets =
+				restoreBuckets(path, onCheckpointRP, 0, mergedBucketStateContainer, mergedPartCounterContainer);
+
+		// we get the maximum of the previous tasks
+		Assert.assertEquals(2L, restoredBuckets.getMaxPartCounter());
+
+		final Map<String, Bucket<String, String>> activeBuckets = restoredBuckets.getActiveBuckets();
+		Assert.assertEquals(1L, activeBuckets.size());
+		Assert.assertTrue(activeBuckets.keySet().contains("test1"));
+
+		final Bucket<String, String> bucket = activeBuckets.get("test1");
+		Assert.assertEquals("test1", bucket.getBucketId());
+		Assert.assertEquals(new Path(path, "test1"), bucket.getBucketPath());
+
+		Assert.assertNotNull(bucket.getInProgressPart()); // the restored part file
+
+		// this is due to the Bucket#merge(). The in progress file of one
+		// of the previous tasks is put in the list of pending files.
+		Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
+
+		// we commit the pending for previous checkpoints
+		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+	}
+
+	@Test
+	public void testOnProcessingTime() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
+				new OnProcessingTimePolicy<>(2L);
+
+		final Buckets<String, String> buckets =
+				createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);
+
+		// it takes the current processing time of the context for the creation time,
+		// and for the last modification time.
+		buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L));
+
+		// now it should roll
+		buckets.onProcessingTime(7L);
+		Assert.assertEquals(1L, rollOnProcessingTimeCountingPolicy.getOnProcessingTimeRollCounter());
+
+		final Map<String, Bucket<String, String>> activeBuckets = buckets.getActiveBuckets();
+		Assert.assertEquals(1L, activeBuckets.size());
+		Assert.assertTrue(activeBuckets.keySet().contains("test"));
+
+		final Bucket<String, String> bucket = activeBuckets.get("test");
+		Assert.assertEquals("test", bucket.getBucketId());
+		Assert.assertEquals(new Path(path, "test"), bucket.getBucketPath());
+		Assert.assertEquals("test", bucket.getBucketId());
+
+		Assert.assertNull(bucket.getInProgressPart());
+		Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
+		Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+	}
+
+	@Test
+	public void testBucketIsRemovedWhenNotActive() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
+				new OnProcessingTimePolicy<>(2L);
+
+		final Buckets<String, String> buckets =
+				createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);
+
+		// it takes the current processing time of the context for the creation time, and for the last modification time.
+		buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L));
+
+		// now it should roll
+		buckets.onProcessingTime(7L);
+		Assert.assertEquals(1L, rollOnProcessingTimeCountingPolicy.getOnProcessingTimeRollCounter());
+
+		buckets.snapshotState(0L, new MockListState<>(), new MockListState<>());
+		buckets.commitUpToCheckpoint(0L);
+
+		Assert.assertTrue(buckets.getActiveBuckets().isEmpty());
+	}
+
+	@Test
+	public void testPartCounterAfterBucketResurrection() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
+				new OnProcessingTimePolicy<>(2L);
+
+		final Buckets<String, String> buckets =
+				createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);
+
+		// it takes the current processing time of the context for the creation time, and for the last modification time.
+		buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L));
+		Assert.assertEquals(1L, buckets.getActiveBuckets().get("test").getPartCounter());
+
+		// now it should roll
+		buckets.onProcessingTime(7L);
+		Assert.assertEquals(1L, rollOnProcessingTimeCountingPolicy.getOnProcessingTimeRollCounter());
+		Assert.assertEquals(1L, buckets.getActiveBuckets().get("test").getPartCounter());
+
+		buckets.snapshotState(0L, new MockListState<>(), new MockListState<>());
+		buckets.commitUpToCheckpoint(0L);
+
+		Assert.assertTrue(buckets.getActiveBuckets().isEmpty());
+
+		buckets.onElement("test", new TestUtils.MockSinkContext(2L, 3L , 4L));
+		Assert.assertEquals(2L, buckets.getActiveBuckets().get("test").getPartCounter());
+	}
+
+	private static class OnProcessingTimePolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
+
+		private static final long serialVersionUID = 1L;
+
+		private int onProcessingTimeRollCounter = 0;
+
+		private final long rolloverInterval;
+
+		OnProcessingTimePolicy(final long rolloverInterval) {
+			this.rolloverInterval = rolloverInterval;
+		}
+
+		public int getOnProcessingTimeRollCounter() {
+			return onProcessingTimeRollCounter;
+		}
+
+		@Override
+		public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
+			return false;
+		}
+
+		@Override
+		public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
+			return false;
+		}
+
+		@Override
+		public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
+			boolean result = currentTime - partFileState.getCreationTime() >= rolloverInterval;
+			if (result) {
+				onProcessingTimeRollCounter++;
 			}
-		});
+			return result;
+		}
 	}
 
-	private static class VarifyingBucketer implements BucketAssigner<String, String> {
+	@Test
+	public void testContextPassingNormalExecution() throws Exception {
+		testCorrectTimestampPassingInContext(1L, 2L, 3L);
+	}
+
+	@Test
+	public void testContextPassingNullTimestamp() throws Exception {
+		testCorrectTimestampPassingInContext(null, 2L, 3L);
+	}
+
+	private void testCorrectTimestampPassingInContext(Long timestamp, long watermark, long processingTime) throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final Buckets<String, String> buckets = new Buckets<>(
+				path,
+				new VerifyingBucketAssigner(timestamp, watermark, processingTime),
+				new DefaultBucketFactoryImpl<>(),
+				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				DefaultRollingPolicy.create().build(),
+				2
+		);
+
+		buckets.onElement(
+				"test",
+				new TestUtils.MockSinkContext(
+						timestamp,
+						watermark,
+						processingTime)
+		);
+	}
+
+	private static class VerifyingBucketAssigner implements BucketAssigner<String, String> {
 
 		private static final long serialVersionUID = 7729086510972377578L;
 
@@ -87,7 +337,7 @@ public Long timestamp() {
 		private final long expectedWatermark;
 		private final long expectedProcessingTime;
 
-		VarifyingBucketer(
+		VerifyingBucketAssigner(
 				final Long expectedTimestamp,
 				final long expectedWatermark,
 				final long expectedProcessingTime
@@ -98,7 +348,7 @@ public Long timestamp() {
 		}
 
 		@Override
-		public String getBucketId(String element, Context context) {
+		public String getBucketId(String element, BucketAssigner.Context context) {
 			final Long elementTimestamp = context.timestamp();
 			final long watermark = context.currentWatermark();
 			final long processingTime = context.currentProcessingTime();
@@ -115,4 +365,35 @@ public String getBucketId(String element, Context context) {
 			return SimpleVersionedStringSerializer.INSTANCE;
 		}
 	}
+
+	// ------------------------------- Utility Methods --------------------------------
+
+	private static Buckets<String, String> createBuckets(
+			final Path basePath,
+			final RollingPolicy<String, String> rollingPolicy,
+			final int subtaskIdx
+	) throws IOException {
+
+		return new Buckets<>(
+				basePath,
+				new TestUtils.StringIdentityBucketAssigner(),
+				new DefaultBucketFactoryImpl<>(),
+				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				rollingPolicy,
+				subtaskIdx
+		);
+	}
+
+	private static Buckets<String, String> restoreBuckets(
+			final Path basePath,
+			final RollingPolicy<String, String> rollingPolicy,
+			final int subtaskIdx,
+			final ListState<byte[]> bucketState,
+			final ListState<Long> partCounterState
+	) throws Exception {
+
+		final Buckets<String, String> restoredBuckets = createBuckets(basePath, rollingPolicy, subtaskIdx);
+		restoredBuckets.initializeState(bucketState, partCounterState);
+		return restoredBuckets;
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 8bb35ff244a..9c3f946829b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -35,7 +31,6 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -494,131 +489,4 @@ public void testScalingDownAndMergingOfStates() throws Exception {
 			Assert.assertEquals(3L, counter);
 		}
 	}
-
-	@Test
-	public void testMaxCounterUponRecovery() throws Exception {
-		final File outDir = TEMP_FOLDER.newFolder();
-
-		OperatorSubtaskState mergedSnapshot;
-
-		final TestBucketFactoryImpl first = new TestBucketFactoryImpl();
-		final TestBucketFactoryImpl second = new TestBucketFactoryImpl();
-
-		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy
-				.create()
-				.withMaxPartSize(2L)
-				.withRolloverInterval(100L)
-				.withInactivityInterval(100L)
-				.build();
-
-		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
-						outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, first);
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink(
-						outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, second)
-		) {
-			testHarness1.setup();
-			testHarness1.open();
-
-			testHarness2.setup();
-			testHarness2.open();
-
-			// we only put elements in one task.
-			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
-			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
-			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
-			TestUtils.checkLocalFs(outDir, 3, 0);
-
-			// intentionally we snapshot them in the reverse order so that the states are shuffled
-			mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
-					testHarness2.snapshot(0L, 0L),
-					testHarness1.snapshot(0L, 0L)
-			);
-		}
-
-		final TestBucketFactoryImpl firstRecovered = new TestBucketFactoryImpl();
-		final TestBucketFactoryImpl secondRecovered = new TestBucketFactoryImpl();
-
-		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
-						outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, firstRecovered);
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink(
-						outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, secondRecovered)
-		) {
-			testHarness1.setup();
-			testHarness1.initializeState(mergedSnapshot);
-			testHarness1.open();
-
-			// we have to send an element so that the factory updates its counter.
-			testHarness1.processElement(new StreamRecord<>(Tuple2.of("test4", 0), 0L));
-
-			Assert.assertEquals(3L, firstRecovered.getInitialCounter());
-			TestUtils.checkLocalFs(outDir, 1, 3);
-
-			testHarness2.setup();
-			testHarness2.initializeState(mergedSnapshot);
-			testHarness2.open();
-
-			// we have to send an element so that the factory updates its counter.
-			testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 0), 0L));
-
-			Assert.assertEquals(3L, secondRecovered.getInitialCounter());
-			TestUtils.checkLocalFs(outDir, 2, 3);
-		}
-	}
-
-	//////////////////////			Helper Methods			//////////////////////
-
-	static class TestBucketFactoryImpl extends DefaultBucketFactoryImpl<Tuple2<String, Integer>, String> {
-
-		private static final long serialVersionUID = 2794824980604027930L;
-
-		private long initialCounter = -1L;
-
-		@Override
-		public Bucket<Tuple2<String, Integer>, String> getNewBucket(
-				final RecoverableWriter fsWriter,
-				final int subtaskIndex,
-				final String bucketId,
-				final Path bucketPath,
-				final long initialPartCounter,
-				final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory,
-				final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy) {
-
-			this.initialCounter = initialPartCounter;
-
-			return super.getNewBucket(
-					fsWriter,
-					subtaskIndex,
-					bucketId,
-					bucketPath,
-					initialPartCounter,
-					partFileWriterFactory,
-					rollingPolicy);
-		}
-
-		@Override
-		public Bucket<Tuple2<String, Integer>, String> restoreBucket(
-				final RecoverableWriter fsWriter,
-				final int subtaskIndex,
-				final long initialPartCounter,
-				final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory,
-				final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy,
-				final BucketState<String> bucketState) throws IOException {
-
-			this.initialCounter = initialPartCounter;
-
-			return super.restoreBucket(
-					fsWriter,
-					subtaskIndex,
-					initialPartCounter,
-					partFileWriterFactory,
-					rollingPolicy,
-					bucketState);
-		}
-
-		public long getInitialCounter() {
-			return initialCounter;
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 851b6825d9a..1f4c7e5b20a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -19,13 +19,11 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -33,11 +31,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Objects;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.either;
-import static org.hamcrest.CoreMatchers.equalTo;
 
 /**
  * Tests for different {@link RollingPolicy rolling policies}.
@@ -50,233 +43,233 @@
 	@Test
 	public void testDefaultRollingPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
 
-		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy
-				.create()
-				.withMaxPartSize(10L)
-				.withInactivityInterval(4L)
-				.withRolloverInterval(11L)
-				.build();
-
-		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink(
-						outDir,
-						1,
-						0,
-						1L,
-						new TestUtils.TupleToStringBucketer(),
-						new SimpleStringEncoder<>(),
-						rollingPolicy,
-						new DefaultBucketFactoryImpl<>())
-		) {
-			testHarness.setup();
-			testHarness.open();
-
-			testHarness.setProcessingTime(0L);
+		final RollingPolicy<String, String> originalRollingPolicy =
+				DefaultRollingPolicy
+						.create()
+						.withMaxPartSize(10L)
+						.withInactivityInterval(4L)
+						.withRolloverInterval(11L)
+						.build();
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
-			TestUtils.checkLocalFs(outDir, 1, 0);
+		final MethodCallCountingPolicyWrapper<String, String> rollingPolicy =
+				new MethodCallCountingPolicyWrapper<>(originalRollingPolicy);
 
-			// roll due to size
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
-			TestUtils.checkLocalFs(outDir, 1, 0);
+		final Buckets<String, String> buckets = createBuckets(path, rollingPolicy);
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
-			TestUtils.checkLocalFs(outDir, 2, 0);
+		rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
 
-			// roll due to inactivity
-			testHarness.setProcessingTime(7L);
+		// these two will fill up the first in-progress file and at the third it will roll ...
+		buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 1L));
+		buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
+		rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
-			TestUtils.checkLocalFs(outDir, 3, 0);
+		buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L));
+		rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
 
-			// roll due to rollover interval
-			testHarness.setProcessingTime(20L);
+		// still no time to roll
+		buckets.onProcessingTime(5L);
+		rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L);
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
-			TestUtils.checkLocalFs(outDir, 4, 0);
+		// roll due to inactivity
+		buckets.onProcessingTime(7L);
+		rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L);
 
-			// we take a checkpoint but we should not roll.
-			testHarness.snapshot(1L, 1L);
+		buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L));
 
-			TestUtils.checkLocalFs(outDir, 4, 0);
+		// roll due to rollover interval
+		buckets.onProcessingTime(20L);
+		rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
 
-			// acknowledge the checkpoint, so publish the 3 closed files, but not the open one.
-			testHarness.notifyOfCompletedCheckpoint(1L);
-			TestUtils.checkLocalFs(outDir, 1, 3);
-		}
+		// we take a checkpoint but we should not roll.
+		buckets.snapshotState(1L, new TestUtils.MockListState<>(), new TestUtils.MockListState<>());
+		rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
 	}
 
 	@Test
 	public void testRollOnCheckpointPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
 
-		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = OnCheckpointRollingPolicy.build();
-
-		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink(
-						outDir,
-						1,
-						0,
-						10L,
-						new TestUtils.TupleToStringBucketer(),
-						new SimpleStringEncoder<>(),
-						rollingPolicy,
-						new DefaultBucketFactoryImpl<>())
-		) {
-			testHarness.setup();
-			testHarness.open();
+		final MethodCallCountingPolicyWrapper<String, String> rollingPolicy =
+				new MethodCallCountingPolicyWrapper<>(OnCheckpointRollingPolicy.build());
 
-			testHarness.setProcessingTime(0L);
+		final Buckets<String, String> buckets = createBuckets(path, rollingPolicy);
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
+		rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
-			TestUtils.checkLocalFs(outDir, 2, 0);
+		buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 2L));
+		buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
+		buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L));
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
-			TestUtils.checkLocalFs(outDir, 2, 0);
+		// ... we have a checkpoint so we roll ...
+		buckets.snapshotState(1L, new TestUtils.MockListState<>(), new TestUtils.MockListState<>());
+		rollingPolicy.verifyCallCounters(1L, 1L, 2L, 0L, 0L, 0L);
 
-			// we take a checkpoint so we roll.
-			testHarness.snapshot(1L, 1L);
+		// ... create a new in-progress file (before we had closed the last one so it was null)...
+		buckets.onElement("test1", new TestUtils.MockSinkContext(5L, 1L, 5L));
 
-			for (File file: FileUtils.listFiles(outDir, null, true)) {
-				if (Objects.equals(file.getParentFile().getName(), "test1")) {
-					Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
-				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
-					Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
-				}
-			}
+		// ... we have a checkpoint so we roll ...
+		buckets.snapshotState(2L, new TestUtils.MockListState<>(), new TestUtils.MockListState<>());
+		rollingPolicy.verifyCallCounters(2L, 2L, 2L, 0L, 0L, 0L);
 
-			// this will create a new part file
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
-			TestUtils.checkLocalFs(outDir, 3, 0);
-
-			testHarness.notifyOfCompletedCheckpoint(1L);
-			for (File file: FileUtils.listFiles(outDir, null, true)) {
-				if (Objects.equals(file.getParentFile().getName(), "test1")) {
-					Assert.assertTrue(
-							file.getName().contains(".part-0-2.inprogress.") || file.getName().equals("part-0-1")
-					);
-				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
-					Assert.assertEquals("part-0-0", file.getName());
-				}
-			}
-
-			// and open and fill .part-0-2.inprogress
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
-			TestUtils.checkLocalFs(outDir, 1, 2);
-
-			// we take a checkpoint so we roll.
-			testHarness.snapshot(2L, 2L);
-
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 7), 7L));
-			TestUtils.checkLocalFs(outDir, 2, 2);
-
-			for (File file: FileUtils.listFiles(outDir, null, true)) {
-				if (Objects.equals(file.getParentFile().getName(), "test1")) {
-					Assert.assertThat(
-							file.getName(),
-							either(containsString(".part-0-2.inprogress."))
-									.or(equalTo("part-0-1"))
-					);
-				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
-					Assert.assertThat(
-							file.getName(),
-							either(containsString(".part-0-3.inprogress."))
-									.or(equalTo("part-0-0"))
-					);
-				}
-			}
-
-			// we acknowledge the last checkpoint so we should publish all but the latest in-progress file
-			testHarness.notifyOfCompletedCheckpoint(2L);
-
-			TestUtils.checkLocalFs(outDir, 1, 3);
-			for (File file: FileUtils.listFiles(outDir, null, true)) {
-				if (Objects.equals(file.getParentFile().getName(), "test1")) {
-					Assert.assertThat(
-							file.getName(),
-							either(equalTo("part-0-2")).or(equalTo("part-0-1"))
-					);
-				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
-					Assert.assertThat(
-							file.getName(),
-							either(containsString(".part-0-3.inprogress."))
-									.or(equalTo("part-0-0"))
-					);
-				}
-			}
-		}
+		buckets.close();
 	}
 
 	@Test
 	public void testCustomRollingPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
 
-		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new RollingPolicy<Tuple2<String, Integer>, String>() {
+		final MethodCallCountingPolicyWrapper<String, String> rollingPolicy = new MethodCallCountingPolicyWrapper<>(
+				new RollingPolicy<String, String>() {
 
-			private static final long serialVersionUID = 1L;
+					private static final long serialVersionUID = 1L;
 
-			@Override
-			public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
-				return true;
-			}
+					@Override
+					public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
+						return true;
+					}
 
-			@Override
-			public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, Tuple2<String, Integer> element) throws IOException {
-				// this means that 2 elements will close the part file.
-				return partFileState.getSize() > 12L;
-			}
+					@Override
+					public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, String element) throws IOException {
+						// this means that 2 elements will close the part file.
+						return partFileState.getSize() > 9L;
+					}
 
-			@Override
-			public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
-				return false;
-			}
-		};
-
-		try (
-				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink(
-						outDir,
-						1,
-						0,
-						10L,
-						new TestUtils.TupleToStringBucketer(),
-						new SimpleStringEncoder<>(),
-						rollingPolicy,
-						new DefaultBucketFactoryImpl<>())
-		) {
-			testHarness.setup();
-			testHarness.open();
+					@Override
+					public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
+						return currentTime - partFileState.getLastUpdateTime() >= 10L;
+					}
+				});
+
+		final Buckets<String, String> buckets = createBuckets(path, rollingPolicy);
 
-			testHarness.setProcessingTime(0L);
+		rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
 
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
+		// the following 2 elements will close a part file because of size...
+		buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 2L));
+		buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
 
-			// the following 2 elements will close a part file ...
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
+		// only one call because we have no open part file in the other incoming elements, so currentPartFile == null so we roll without checking the policy.
+		rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
 
-			// ... and this one will open a new ...
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 2L));
-			TestUtils.checkLocalFs(outDir, 3, 0);
+		// ... and this one will trigger the roll and open a new part file...
+		buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
+		rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
 
-			// ... and all open part files should close here.
-			testHarness.snapshot(1L, 1L);
+		// ... we have a checkpoint so we roll ...
+		buckets.snapshotState(1L, new TestUtils.MockListState<>(), new TestUtils.MockListState<>());
+		rollingPolicy.verifyCallCounters(1L, 1L, 2L, 1L, 0L, 0L);
 
-			// this will create and fill out a new part file
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
-			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
-			TestUtils.checkLocalFs(outDir, 4, 0);
+		// ... create a new in-progress file (before we had closed the last one so it was null)...
+		buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 5L));
 
-			// we take a checkpoint so we roll.
-			testHarness.snapshot(2L, 2L);
+		// ... last modification time is 5L, so now we DON'T roll but we check ...
+		buckets.onProcessingTime(12L);
+		rollingPolicy.verifyCallCounters(1L, 1L, 2L, 1L, 1L, 0L);
+
+		// ... last modification time is 5L, so now we roll
+		buckets.onProcessingTime(16L);
+		rollingPolicy.verifyCallCounters(1L, 1L, 2L, 1L, 2L, 1L);
+
+		buckets.close();
+	}
+
+	// ------------------------------- Utility Methods --------------------------------
+
+	private static Buckets<String, String> createBuckets(
+			final Path basePath,
+			final MethodCallCountingPolicyWrapper<String, String> rollingPolicyToTest
+	) throws IOException {
+
+		return new Buckets<>(
+				basePath,
+				new TestUtils.StringIdentityBucketAssigner(),
+				new DefaultBucketFactoryImpl<>(),
+				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+				rollingPolicyToTest,
+				0
+		);
+	}
 
-			// we acknowledge the first checkpoint so we should publish all but the latest in-progress file
-			testHarness.notifyOfCompletedCheckpoint(1L);
-			TestUtils.checkLocalFs(outDir, 1, 3);
+	/**
+	 * A wrapper of a {@link RollingPolicy} which counts how many times each method of the policy was called
+	 * and in how many of them it decided to roll.
+	 */
+	private static class MethodCallCountingPolicyWrapper<IN, BucketID> implements RollingPolicy<IN, BucketID> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final RollingPolicy<IN, BucketID> originalPolicy;
+
+		private long onCheckpointCallCounter;
+		private long onCheckpointRollCounter;
+
+		private long onEventCallCounter;
+		private long onEventRollCounter;
+
+		private long onProcessingTimeCallCounter;
+		private long onProcessingTimeRollCounter;
+
+		MethodCallCountingPolicyWrapper(final RollingPolicy<IN, BucketID> policy) {
+			this.originalPolicy = Preconditions.checkNotNull(policy);
+
+			this.onCheckpointCallCounter = 0L;
+			this.onCheckpointRollCounter = 0L;
+
+			this.onEventCallCounter = 0L;
+			this.onEventRollCounter = 0L;
+
+			this.onProcessingTimeCallCounter = 0L;
+			this.onProcessingTimeRollCounter = 0L;
+		}
+
+		@Override
+		public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException {
+			final boolean shouldRoll = originalPolicy.shouldRollOnCheckpoint(partFileState);
+			this.onCheckpointCallCounter++;
+			if (shouldRoll) {
+				this.onCheckpointRollCounter++;
+			}
+			return shouldRoll;
+		}
+
+		@Override
+		public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException {
+			final boolean shouldRoll = originalPolicy.shouldRollOnEvent(partFileState, element);
+			this.onEventCallCounter++;
+			if (shouldRoll) {
+				this.onEventRollCounter++;
+			}
+			return shouldRoll;
+		}
+
+		@Override
+		public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException {
+			final boolean shouldRoll = originalPolicy.shouldRollOnProcessingTime(partFileState, currentTime);
+			this.onProcessingTimeCallCounter++;
+			if (shouldRoll) {
+				this.onProcessingTimeRollCounter++;
+			}
+			return shouldRoll;
+		}
+
+		void verifyCallCounters(
+				final long onCheckpointCalls,
+				final long onCheckpointRolls,
+				final long onEventCalls,
+				final long onEventRolls,
+				final long onProcessingTimeCalls,
+				final long onProcessingTimeRolls
+		) {
+			Assert.assertEquals(onCheckpointCalls, onCheckpointCallCounter);
+			Assert.assertEquals(onCheckpointRolls, onCheckpointRollCounter);
+			Assert.assertEquals(onEventCalls, onEventCallCounter);
+			Assert.assertEquals(onEventRolls, onEventRollCounter);
+			Assert.assertEquals(onProcessingTimeCalls, onProcessingTimeCallCounter);
+			Assert.assertEquals(onProcessingTimeRolls, onProcessingTimeRollCounter);
 		}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index bfbc12043ee..9e33064329e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -20,9 +20,11 @@
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -31,11 +33,17 @@
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -160,4 +168,109 @@ public String getBucketId(Tuple2<String, Integer> element, Context context) {
 			return SimpleVersionedStringSerializer.INSTANCE;
 		}
 	}
+
+	/**
+	 * A simple {@link BucketAssigner} that accepts {@code String}'s
+	 * and returns the element itself as the bucket id.
+	 */
+	static class StringIdentityBucketAssigner implements BucketAssigner<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getBucketId(String element, BucketAssigner.Context context) {
+			return element;
+		}
+
+		@Override
+		public SimpleVersionedSerializer<String> getSerializer() {
+			return SimpleVersionedStringSerializer.INSTANCE;
+		}
+	}
+
+	/**
+	 * A mock {@link SinkFunction.Context} to be used in the tests.
+	 */
+	static class MockSinkContext implements SinkFunction.Context {
+
+		@Nullable
+		private Long elementTimestamp;
+
+		private long watermark;
+
+		private long processingTime;
+
+		MockSinkContext(
+				@Nullable Long elementTimestamp,
+				long watermark,
+				long processingTime) {
+			this.elementTimestamp = elementTimestamp;
+			this.watermark = watermark;
+			this.processingTime = processingTime;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return processingTime;
+		}
+
+		@Override
+		public long currentWatermark() {
+			return watermark;
+		}
+
+		@Nullable
+		@Override
+		public Long timestamp() {
+			return elementTimestamp;
+		}
+	}
+
+	/**
+	 * A mock {@link ListState} used for testing the snapshot/restore cycle of the sink.
+	 */
+	static class MockListState<T> implements ListState<T> {
+
+		private final List<T> backingList;
+
+		MockListState() {
+			this.backingList = new ArrayList<>();
+		}
+
+		public List<T> getBackingList() {
+			return backingList;
+		}
+
+		@Override
+		public void update(List<T> values) {
+			backingList.clear();
+			addAll(values);
+		}
+
+		@Override
+		public void addAll(List<T> values) {
+			backingList.addAll(values);
+		}
+
+		@Override
+		public Iterable<T> get() {
+			return new Iterable<T>() {
+
+				@Nonnull
+				@Override
+				public Iterator<T> iterator() {
+					return backingList.iterator();
+				}
+			};
+		}
+
+		@Override
+		public void add(T value) {
+			backingList.add(value);
+		}
+
+		@Override
+		public void clear() {
+			backingList.clear();
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> More tests to increase StreamingFileSink test coverage
> ------------------------------------------------------
>
>                 Key: FLINK-10097
>                 URL: https://issues.apache.org/jira/browse/FLINK-10097
>             Project: Flink
>          Issue Type: Sub-task
>          Components: filesystem-connector
>    Affects Versions: 1.6.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)