You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/23 12:46:35 UTC

[5/5] flink git commit: [FLINK-9921][DataStream API] Update RollingPolicy interface

[FLINK-9921][DataStream API] Update RollingPolicy interface


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/702f7735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/702f7735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/702f7735

Branch: refs/heads/release-1.6
Commit: 702f77355bbb9ccf560885dd5c6a717a25cafa53
Parents: fdb11c5
Author: kkloudas <kk...@gmail.com>
Authored: Fri Jul 20 17:03:16 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Mon Jul 23 14:23:32 2018 +0200

----------------------------------------------------------------------
 .../api/functions/sink/filesystem/Buckets.java  | 13 +++-------
 .../sink/filesystem/RollingPolicy.java          |  5 ++--
 .../sink/filesystem/StreamingFileSink.java      |  7 +++--
 .../rolling/policies/DefaultRollingPolicy.java  | 27 +++++---------------
 .../policies/OnCheckpointRollingPolicy.java     |  4 +--
 .../filesystem/LocalStreamingFileSinkTest.java  |  2 +-
 .../sink/filesystem/RollingPolicyTest.java      |  8 +++---
 .../functions/sink/filesystem/TestUtils.java    |  4 +--
 8 files changed, 26 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 6afba17..e6f8c00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -61,7 +61,7 @@ public class Buckets<IN, BucketID> {
 
 	private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
 
-	private final RollingPolicy<BucketID> rollingPolicy;
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
 	// --------------------------- runtime fields -----------------------------
 
@@ -95,7 +95,7 @@ public class Buckets<IN, BucketID> {
 			final Bucketer<IN, BucketID> bucketer,
 			final BucketFactory<IN, BucketID> bucketFactory,
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
-			final RollingPolicy<BucketID> rollingPolicy,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
 			final int subtaskIndex) throws IOException {
 
 		this.basePath = Preconditions.checkNotNull(basePath);
@@ -189,7 +189,6 @@ public class Buckets<IN, BucketID> {
 
 	void snapshotState(
 			final long checkpointId,
-			final long checkpointTimestamp,
 			final ListState<byte[]> bucketStates,
 			final ListState<Long> partCounterState) throws Exception {
 
@@ -201,11 +200,7 @@ public class Buckets<IN, BucketID> {
 		for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
 			final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
 
-			if (info != null &&
-					(rollingPolicy.shouldRollOnCheckpoint(info) ||
-					rollingPolicy.shouldRollOnEvent(info) ||
-					rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp))
-			) {
+			if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) {
 				// we also check here so that we do not have to always
 				// wait for the "next" element to arrive.
 				bucket.closePartFile();
@@ -249,7 +244,7 @@ public class Buckets<IN, BucketID> {
 		}
 
 		final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
-		if (info == null || rollingPolicy.shouldRollOnEvent(info)) {
+		if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) {
 			bucket.rollPartFile(currentProcessingTime);
 		}
 		bucket.write(value, currentProcessingTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
index 24c38aa..b1354a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  * rolls its currently open part file and opens a new one.
  */
 @PublicEvolving
-public interface RollingPolicy<BucketID> extends Serializable {
+public interface RollingPolicy<IN, BucketID> extends Serializable {
 
 	/**
 	 * Determines if the in-progress part file for a bucket should roll on every checkpoint.
@@ -39,10 +39,11 @@ public interface RollingPolicy<BucketID> extends Serializable {
 
 	/**
 	 * Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.
+	 * @param element the element being processed.
 	 * @param partFileState the state of the currently open part file of the bucket.
 	 * @return {@code True} if the part file should roll, {@link false} otherwise.
 	 */
-	boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) throws IOException;
+	boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element) throws IOException;
 
 	/**
 	 * Determines if the in-progress part file for a bucket should roll based on a time condition.

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index c208079..0ebcc4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -194,7 +194,7 @@ public class StreamingFileSink<IN>
 
 		private Bucketer<IN, BucketID> bucketer;
 
-		private RollingPolicy<BucketID> rollingPolicy;
+		private RollingPolicy<IN, BucketID> rollingPolicy;
 
 		private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
 
@@ -215,12 +215,12 @@ public class StreamingFileSink<IN>
 			return this;
 		}
 
-		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<BucketID> policy) {
+		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
 			this.rollingPolicy = Preconditions.checkNotNull(policy);
 			return this;
 		}
 
-		public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> policy) {
+		public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, ID> policy) {
 			@SuppressWarnings("unchecked")
 			StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
 			reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
@@ -340,7 +340,6 @@ public class StreamingFileSink<IN>
 
 		buckets.snapshotState(
 				context.getCheckpointId(),
-				context.getCheckpointTimestamp(),
 				bucketStates,
 				maxPartCountersState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
index a9ff617..15c3b4d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
@@ -37,7 +37,7 @@ import java.io.IOException;
  * </ol>
  */
 @PublicEvolving
-public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -67,32 +67,19 @@ public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<Bucke
 	}
 
 	@Override
-	public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
-		return false;
+	public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException {
+		return partFileState.getSize() > partSize;
 	}
 
 	@Override
-	public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) throws IOException {
-		if (partFileState == null) {
-			// this means that there is no currently open part file.
-			return true;
-		}
-
+	public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException {
 		return partFileState.getSize() > partSize;
 	}
 
 	@Override
 	public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) {
-		if (partFileState == null) {
-			// this means that there is no currently open part file.
-			return true;
-		}
-
-		if (currentTime - partFileState.getCreationTime() > rolloverInterval) {
-			return true;
-		}
-
-		return currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
+		return currentTime - partFileState.getCreationTime() > rolloverInterval ||
+				currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
 	}
 
 	/**
@@ -150,7 +137,7 @@ public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<Bucke
 		/**
 		 * Creates the actual policy.
 		 */
-		public <BucketID> DefaultRollingPolicy<BucketID> build() {
+		public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build() {
 			return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
index 4361941..df15981 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 /**
  * A {@link RollingPolicy} which rolls on every checkpoint.
  */
-public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+public class OnCheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -34,7 +34,7 @@ public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<Bucket
 	}
 
 	@Override
-	public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) {
+	public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
 		return false;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 6e942e9..7c23918 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -494,7 +494,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 		final TestBucketFactory first = new TestBucketFactory();
 		final TestBucketFactory second = new TestBucketFactory();
 
-		final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy
 				.create()
 				.withMaxPartSize(2L)
 				.withRolloverInterval(100L)

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 61e1433..078a46b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -44,7 +44,7 @@ public class RollingPolicyTest {
 	public void testDefaultRollingPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 
-		final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy
 				.create()
 				.withMaxPartSize(10L)
 				.withInactivityInterval(4L)
@@ -104,7 +104,7 @@ public class RollingPolicyTest {
 	public void testRollOnCheckpointPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 
-		final RollingPolicy<String> rollingPolicy = new OnCheckpointRollingPolicy<>();
+		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new OnCheckpointRollingPolicy<>();
 
 		try (
 				OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink(
@@ -159,7 +159,7 @@ public class RollingPolicyTest {
 	public void testCustomRollingPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 
-		final RollingPolicy<String> rollingPolicy = new RollingPolicy<String>() {
+		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new RollingPolicy<Tuple2<String, Integer>, String>() {
 
 			private static final long serialVersionUID = 1L;
 
@@ -169,7 +169,7 @@ public class RollingPolicyTest {
 			}
 
 			@Override
-			public boolean shouldRollOnEvent(PartFileInfo<String> partFileState) throws IOException {
+			public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, Tuple2<String, Integer> element) throws IOException {
 				// this means that 2 elements will close the part file.
 				return partFileState.getSize() > 12L;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 184e23e..9589c5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -51,7 +51,7 @@ public class TestUtils {
 			long inactivityInterval,
 			long partMaxSize) throws Exception {
 
-		final RollingPolicy<String> rollingPolicy =
+		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy =
 				DefaultRollingPolicy
 						.create()
 						.withMaxPartSize(partMaxSize)
@@ -84,7 +84,7 @@ public class TestUtils {
 			final long bucketCheckInterval,
 			final Bucketer<Tuple2<String, Integer>, String> bucketer,
 			final Encoder<Tuple2<String, Integer>> writer,
-			final RollingPolicy<String> rollingPolicy,
+			final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy,
 			final BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception {
 
 		StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink