You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/08/27 11:00:03 UTC
[flink] branch master updated: [FLINK-13832] Rename
DefaultRollingPolicy create to builder
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ba151c6 [FLINK-13832] Rename DefaultRollingPolicy create to builder
ba151c6 is described below
commit ba151c6b278ac7b25b7c514711639cdbd270e1fd
Author: Gyula Fora <gy...@cloudera.com>
AuthorDate: Tue Aug 27 12:30:22 2019 +0200
[FLINK-13832] Rename DefaultRollingPolicy create to builder
Closes #9527
---
.../sink/filesystem/StreamingFileSink.java | 2 +-
.../rollingpolicies/DefaultRollingPolicy.java | 39 ++++++++++++++++++++--
.../sink/filesystem/BucketAssignerITCases.java | 2 +-
.../api/functions/sink/filesystem/BucketTest.java | 2 +-
.../api/functions/sink/filesystem/BucketsTest.java | 4 +--
.../sink/filesystem/RollingPolicyTest.java | 15 ++++++++-
.../api/functions/sink/filesystem/TestUtils.java | 2 +-
7 files changed, 56 insertions(+), 10 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 975265c..716b4c9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -202,7 +202,7 @@ public class StreamingFileSink<IN>
private final String partFileSuffix;
RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
- this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
+ this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), 60L * 1000L, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
}
private RowFormatBuilder(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index f890326..d9bfbcc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -83,10 +83,34 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
}
/**
- * Initiates the instantiation of a {@code DefaultRollingPolicy}.
- * To finalize it and have the actual policy, call {@code .create()}.
+ * Returns the maximum part file size before rolling.
+ * @return Max size in bytes
*/
- public static DefaultRollingPolicy.PolicyBuilder create() {
+ public long getMaxPartSize() {
+ return partSize;
+ }
+
+ /**
+ * Returns the maximum time duration a part file can stay open before rolling.
+ * @return Time duration in milliseconds
+ */
+ public long getRolloverInterval() {
+ return rolloverInterval;
+ }
+
+ /**
+ * Returns time duration of allowed inactivity after which a part file will have to roll.
+ * @return Time duration in milliseconds
+ */
+ public long getInactivityInterval() {
+ return inactivityInterval;
+ }
+
+ /**
+ * Creates a new {@link PolicyBuilder} that is used to configure and build
+ * an instance of {@code DefaultRollingPolicy}.
+ */
+ public static DefaultRollingPolicy.PolicyBuilder builder() {
return new DefaultRollingPolicy.PolicyBuilder(
DEFAULT_MAX_PART_SIZE,
DEFAULT_ROLLOVER_INTERVAL,
@@ -94,7 +118,16 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
}
/**
+ * This method is {@link Deprecated}, use {@link DefaultRollingPolicy#builder()} instead.
+ */
+ @Deprecated
+ public static DefaultRollingPolicy.PolicyBuilder create() {
+ return builder();
+ }
+
+ /**
* A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
+ * The {@link PolicyBuilder#build()} method must be called to instantiate the policy.
*/
@PublicEvolving
public static final class PolicyBuilder {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index 8dc44e6..cc91a33 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -46,7 +46,7 @@ public class BucketAssignerITCases {
final RollingPolicy<String, String> rollingPolicy =
DefaultRollingPolicy
- .create()
+ .builder()
.withMaxPartSize(7L)
.build();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 969ecd7..546a08c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -369,7 +369,7 @@ public class BucketTest {
private static final String bucketId = "testing-bucket";
- private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create().build();
+ private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index ba2ea57..ac7071d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -107,7 +107,7 @@ public class BucketsTest {
final RollingPolicy<String, String> onCheckpointRP =
DefaultRollingPolicy
- .create()
+ .builder()
.withMaxPartSize(7L) // roll with 2 elements
.build();
@@ -316,7 +316,7 @@ public class BucketsTest {
new VerifyingBucketAssigner(timestamp, watermark, processingTime),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
- DefaultRollingPolicy.create().build(),
+ DefaultRollingPolicy.builder().build(),
2,
new PartFileConfig()
);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index ed3ac0c..be9db3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -47,7 +47,7 @@ public class RollingPolicyTest {
final RollingPolicy<String, String> originalRollingPolicy =
DefaultRollingPolicy
- .create()
+ .builder()
.withMaxPartSize(10L)
.withInactivityInterval(4L)
.withRolloverInterval(11L)
@@ -88,6 +88,19 @@ public class RollingPolicyTest {
}
@Test
+ public void testDefaultRollingPolicyDeprecatedCreate() throws Exception {
+ DefaultRollingPolicy policy = DefaultRollingPolicy.create()
+ .withInactivityInterval(10)
+ .withMaxPartSize(20)
+ .withRolloverInterval(30)
+ .build();
+
+ Assert.assertEquals(10, policy.getInactivityInterval());
+ Assert.assertEquals(20, policy.getMaxPartSize());
+ Assert.assertEquals(30, policy.getRolloverInterval());
+ }
+
+ @Test
public void testRollOnCheckpointPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
final Path path = new Path(outDir.toURI());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 0333df6..55ba6d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -62,7 +62,7 @@ public class TestUtils {
final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy =
DefaultRollingPolicy
- .create()
+ .builder()
.withMaxPartSize(partMaxSize)
.withRolloverInterval(inactivityInterval)
.withInactivityInterval(inactivityInterval)