You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/08/27 11:00:03 UTC

[flink] branch master updated: [FLINK-13832] Rename DefaultRollingPolicy create to builder

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba151c6  [FLINK-13832] Rename DefaultRollingPolicy create to builder
ba151c6 is described below

commit ba151c6b278ac7b25b7c514711639cdbd270e1fd
Author: Gyula Fora <gy...@cloudera.com>
AuthorDate: Tue Aug 27 12:30:22 2019 +0200

    [FLINK-13832] Rename DefaultRollingPolicy create to builder
    
    Closes #9527
---
 .../sink/filesystem/StreamingFileSink.java         |  2 +-
 .../rollingpolicies/DefaultRollingPolicy.java      | 39 ++++++++++++++++++++--
 .../sink/filesystem/BucketAssignerITCases.java     |  2 +-
 .../api/functions/sink/filesystem/BucketTest.java  |  2 +-
 .../api/functions/sink/filesystem/BucketsTest.java |  4 +--
 .../sink/filesystem/RollingPolicyTest.java         | 15 ++++++++-
 .../api/functions/sink/filesystem/TestUtils.java   |  2 +-
 7 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 975265c..716b4c9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -202,7 +202,7 @@ public class StreamingFileSink<IN>
 		private final String partFileSuffix;
 
 		RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
-			this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
+			this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), 60L * 1000L, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
 		}
 
 		private RowFormatBuilder(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index f890326..d9bfbcc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -83,10 +83,34 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
 	}
 
 	/**
-	 * Initiates the instantiation of a {@code DefaultRollingPolicy}.
-	 * To finalize it and have the actual policy, call {@code .create()}.
+	 * Returns the maximum part file size before rolling.
+	 * @return Max size in bytes
 	 */
-	public static DefaultRollingPolicy.PolicyBuilder create() {
+	public long getMaxPartSize() {
+		return partSize;
+	}
+
+	/**
+	 * Returns the maximum time duration a part file can stay open before rolling.
+	 * @return Time duration in milliseconds
+	 */
+	public long getRolloverInterval() {
+		return rolloverInterval;
+	}
+
+	/**
+	 * Returns time duration of allowed inactivity after which a part file will have to roll.
+	 * @return Time duration in milliseconds
+	 */
+	public long getInactivityInterval() {
+		return inactivityInterval;
+	}
+
+	/**
+	 * Creates a new {@link PolicyBuilder} that is used to configure and build
+	 * an instance of {@code DefaultRollingPolicy}.
+	 */
+	public static DefaultRollingPolicy.PolicyBuilder builder() {
 		return new DefaultRollingPolicy.PolicyBuilder(
 				DEFAULT_MAX_PART_SIZE,
 				DEFAULT_ROLLOVER_INTERVAL,
@@ -94,7 +118,16 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
 	}
 
 	/**
+	 * This method is {@link Deprecated}, use {@link DefaultRollingPolicy#builder()} instead.
+	 */
+	@Deprecated
+	public static DefaultRollingPolicy.PolicyBuilder create() {
+		return builder();
+	}
+
+	/**
 	 * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
+	 * The {@link PolicyBuilder#build()} method must be called to instantiate the policy.
 	 */
 	@PublicEvolving
 	public static final class PolicyBuilder {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index 8dc44e6..cc91a33 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -46,7 +46,7 @@ public class BucketAssignerITCases {
 
 		final RollingPolicy<String, String> rollingPolicy =
 			DefaultRollingPolicy
-				.create()
+				.builder()
 				.withMaxPartSize(7L)
 				.build();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 969ecd7..546a08c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -369,7 +369,7 @@ public class BucketTest {
 
 	private static final String bucketId = "testing-bucket";
 
-	private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create().build();
+	private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
 
 	private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
 			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index ba2ea57..ac7071d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -107,7 +107,7 @@ public class BucketsTest {
 
 		final RollingPolicy<String, String> onCheckpointRP =
 				DefaultRollingPolicy
-						.create()
+						.builder()
 						.withMaxPartSize(7L) // roll with 2 elements
 						.build();
 
@@ -316,7 +316,7 @@ public class BucketsTest {
 				new VerifyingBucketAssigner(timestamp, watermark, processingTime),
 				new DefaultBucketFactoryImpl<>(),
 				new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
-				DefaultRollingPolicy.create().build(),
+				DefaultRollingPolicy.builder().build(),
 				2,
 				new PartFileConfig()
 		);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index ed3ac0c..be9db3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -47,7 +47,7 @@ public class RollingPolicyTest {
 
 		final RollingPolicy<String, String> originalRollingPolicy =
 				DefaultRollingPolicy
-						.create()
+						.builder()
 						.withMaxPartSize(10L)
 						.withInactivityInterval(4L)
 						.withRolloverInterval(11L)
@@ -88,6 +88,19 @@ public class RollingPolicyTest {
 	}
 
 	@Test
+	public void testDefaultRollingPolicyDeprecatedCreate() throws Exception {
+		DefaultRollingPolicy policy = DefaultRollingPolicy.create()
+			.withInactivityInterval(10)
+			.withMaxPartSize(20)
+			.withRolloverInterval(30)
+			.build();
+
+		Assert.assertEquals(10, policy.getInactivityInterval());
+		Assert.assertEquals(20, policy.getMaxPartSize());
+		Assert.assertEquals(30, policy.getRolloverInterval());
+	}
+
+	@Test
 	public void testRollOnCheckpointPolicy() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 		final Path path = new Path(outDir.toURI());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 0333df6..55ba6d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -62,7 +62,7 @@ public class TestUtils {
 
 		final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy =
 				DefaultRollingPolicy
-						.create()
+						.builder()
 						.withMaxPartSize(partMaxSize)
 						.withRolloverInterval(inactivityInterval)
 						.withInactivityInterval(inactivityInterval)