You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/10 18:00:26 UTC
[flink] branch master updated: [FLINK-24834][connectors /
filesystem] Add typed builders to DefaultRollingPolicy
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 059c00f [FLINK-24834][connectors / filesystem] Add typed builders to DefaultRollingPolicy
059c00f is described below
commit 059c00f4f6bbf0b868d4d8e0971e29d0168894f9
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 8 14:26:59 2021 -0600
[FLINK-24834][connectors / filesystem] Add typed builders to DefaultRollingPolicy
This closes #17731
---
.../docs/connectors/datastream/file_sink.md | 20 ++++++----
.../docs/connectors/datastream/streamfile_sink.md | 6 +--
.../docs/connectors/datastream/file_sink.md | 20 ++++++----
.../docs/connectors/datastream/streamfile_sink.md | 12 +++---
...leWriterBucketStateSerializerMigrationTest.java | 3 +-
.../file/sink/writer/FileWriterBucketTest.java | 3 +-
.../connector/file/sink/writer/FileWriterTest.java | 9 ++++-
.../rollingpolicies/DefaultRollingPolicy.java | 45 ++++++++++++++++++++++
.../sink/filesystem/BucketAssignerITCases.java | 3 +-
.../sink/filesystem/BucketStateGenerator.java | 3 +-
.../sink/filesystem/BucketStateSerializerTest.java | 3 +-
.../sink/filesystem/BucketsRollingPolicyTest.java | 16 ++++----
.../api/functions/sink/filesystem/BucketsTest.java | 3 +-
.../filesystem/LocalStreamingFileSinkTest.java | 8 ++--
.../api/functions/sink/filesystem/TestUtils.java | 8 ++--
.../source/ContinuousFileReaderOperatorITCase.java | 5 ++-
16 files changed, 122 insertions(+), 45 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/file_sink.md b/docs/content.zh/docs/connectors/datastream/file_sink.md
index 329c7cf..406fcb5 100644
--- a/docs/content.zh/docs/connectors/datastream/file_sink.md
+++ b/docs/content.zh/docs/connectors/datastream/file_sink.md
@@ -64,23 +64,26 @@ File Sink 会将数据写入到桶中。由于输入流可能是无界的,因
字符串元素写入示例:
-{{< tabs "946da1d5-b046-404e-ab80-a5a5d251d8ee" >}}
+{{< tabs "08046394-3912-497d-ab4b-e07a0ef1f519" >}}
{{< tab "Java" >}}
```java
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import java.time.Duration;
+
DataStream<String> input = ...;
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
@@ -92,18 +95,21 @@ input.sinkTo(sink);
```scala
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
+import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+import java.time.Duration
+
val input: DataStream[String] = ...
val sink: FileSink[String] = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build()
diff --git a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
index 4561891..83818b3 100644
--- a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
+++ b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
@@ -104,9 +104,9 @@ val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build()
diff --git a/docs/content/docs/connectors/datastream/file_sink.md b/docs/content/docs/connectors/datastream/file_sink.md
index 0de512a..5dba048 100644
--- a/docs/content/docs/connectors/datastream/file_sink.md
+++ b/docs/content/docs/connectors/datastream/file_sink.md
@@ -76,7 +76,7 @@ that is used for serializing individual rows to the `OutputStream` of the in-pro
In addition to the bucket assigner, the RowFormatBuilder allows the user to specify:
- Custom RollingPolicy : Rolling policy to override the DefaultRollingPolicy
- - bucketCheckInterval (default = 1 min) : Millisecond interval for checking time based rolling policies
+ - bucketCheckInterval (default = 1 min) : Interval for checking time based rolling policies
Basic usage for writing String elements thus looks like this:
@@ -86,18 +86,21 @@ Basic usage for writing String elements thus looks like this:
```java
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import java.time.Duration;
+
DataStream<String> input = ...;
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
@@ -109,18 +112,21 @@ input.sinkTo(sink);
```scala
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
+import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+import java.time.Duration
+
val input: DataStream[String] = ...
val sink: FileSink[String] = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build()
diff --git a/docs/content/docs/connectors/datastream/streamfile_sink.md b/docs/content/docs/connectors/datastream/streamfile_sink.md
index c008898..65908de 100644
--- a/docs/content/docs/connectors/datastream/streamfile_sink.md
+++ b/docs/content/docs/connectors/datastream/streamfile_sink.md
@@ -91,9 +91,9 @@ final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
@@ -114,9 +114,9 @@ val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .withInactivityInterval(Duration.ofSeconds(10))
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build()
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
index 2b8f1ec..65f15f6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.sink.writer;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.core.fs.FileSystem;
@@ -257,7 +258,7 @@ public class FileWriterBucketStateSerializerMigrationTest {
throws IOException {
return FileWriterBucket.restore(
createBucketWriter(),
- DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+ DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10)).build(),
bucketState,
OutputFileConfig.builder().build());
}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
index 319b9f5..87d8295 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
@@ -46,6 +46,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -200,7 +201,7 @@ public class FileWriterBucketTest {
Path path = new Path(outDir.toURI());
RollingPolicy<String, String> onProcessingTimeRollingPolicy =
- DefaultRollingPolicy.builder().withRolloverInterval(10).build();
+ DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMillis(10)).build();
TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
FileWriterBucket<String> bucket =
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index ab55433..a8fe439 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -50,6 +50,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -219,7 +220,9 @@ public class FileWriterTest {
createWriter(
path,
new FileSinkTestUtils.StringIdentityBucketAssigner(),
- DefaultRollingPolicy.builder().withRolloverInterval(10).build(),
+ DefaultRollingPolicy.builder()
+ .withRolloverInterval(Duration.ofMillis(10))
+ .build(),
new OutputFileConfig("part-", ""),
processingTimeService,
5);
@@ -311,7 +314,9 @@ public class FileWriterTest {
createWriter(
path,
new VerifyingBucketAssigner(timestamp, watermark, processingTime),
- DefaultRollingPolicy.builder().withRolloverInterval(10).build(),
+ DefaultRollingPolicy.builder()
+ .withRolloverInterval(Duration.ofMillis(10))
+ .build(),
new OutputFileConfig("part-", ""),
processingTimeService,
5);
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index 9ede847..2c5a663 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -19,11 +19,13 @@
package org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import java.time.Duration;
/**
* The default implementation of the {@link RollingPolicy}.
@@ -151,6 +153,18 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
*
* @param size the allowed part size.
*/
+ public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final MemorySize size) {
+ Preconditions.checkNotNull(size, "Rolling policy memory size cannot be null");
+ return new PolicyBuilder(size.getBytes(), rolloverInterval, inactivityInterval);
+ }
+
+ /**
+ * Sets the part size above which a part file will have to roll.
+ *
+ * @param size the allowed part size.
+ * @deprecated Use {@link #withMaxPartSize(MemorySize)} instead.
+ */
+ @Deprecated
public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final long size) {
Preconditions.checkState(size > 0L);
return new PolicyBuilder(size, rolloverInterval, inactivityInterval);
@@ -163,25 +177,56 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
* setting.
*
* @param interval the allowed inactivity interval.
+ * @deprecated Use {@link #withInactivityInterval(Duration)} instead.
*/
+ @Deprecated
public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(final long interval) {
Preconditions.checkState(interval > 0L);
return new PolicyBuilder(partSize, rolloverInterval, interval);
}
/**
+ * Sets the interval of allowed inactivity after which a part file will have to roll. The
+ * frequency at which this is checked is controlled by the {@link
+ * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
+ * setting.
+ *
+ * @param interval the allowed inactivity interval.
+ */
+ public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(final Duration interval) {
+ Preconditions.checkNotNull(
+ interval, "Rolling policy inactivity interval cannot be null");
+ return new PolicyBuilder(partSize, rolloverInterval, interval.toMillis());
+ }
+
+ /**
* Sets the max time a part file can stay open before having to roll. The frequency at which
* this is checked is controlled by the {@link
* org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
* setting.
*
* @param interval the desired rollover interval.
+ * @deprecated Use {@link #withRolloverInterval(Duration)} instead.
*/
+ @Deprecated
public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(final long interval) {
Preconditions.checkState(interval > 0L);
return new PolicyBuilder(partSize, interval, inactivityInterval);
}
+ /**
+ * Sets the max time a part file can stay open before having to roll. The frequency at which
+ * this is checked is controlled by the {@link
+ * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
+ * setting.
+ *
+ * @param interval the desired rollover interval.
+ */
+ public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(final Duration interval) {
+ Preconditions.checkNotNull(interval, "Rolling policy rollover interval cannot be null");
+ return new PolicyBuilder(partSize, interval.toMillis(), inactivityInterval);
+ }
+
/** Creates the actual policy. */
public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build() {
return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index 7d5198f..e12638a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
@@ -43,7 +44,7 @@ public class BucketAssignerITCases {
final long time = 1000L;
final RollingPolicy<String, String> rollingPolicy =
- DefaultRollingPolicy.builder().withMaxPartSize(7L).build();
+ DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(7L)).build();
final Buckets<String, String> buckets =
new Buckets<>(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
index d75c8f5..5fede59 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -166,7 +167,7 @@ public class BucketStateGenerator {
bucketPath,
0,
createBucketWriter(),
- DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+ DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10)).build(),
null,
OutputFileConfig.builder().build());
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 1ace592..c456ba7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -242,7 +243,7 @@ public class BucketStateSerializerTest {
0,
initialPartCounter,
createBucketWriter(),
- DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+ DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10)).build(),
bucketState,
null,
OutputFileConfig.builder().build());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java
index c06b6c8..8eefba0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
@@ -32,6 +33,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
/** Tests for different {@link RollingPolicy rolling policies}. */
public class BucketsRollingPolicyTest {
@@ -45,9 +47,9 @@ public class BucketsRollingPolicyTest {
final RollingPolicy<String, String> originalRollingPolicy =
DefaultRollingPolicy.builder()
- .withMaxPartSize(10L)
- .withInactivityInterval(4L)
- .withRolloverInterval(11L)
+ .withMaxPartSize(new MemorySize(10L))
+ .withInactivityInterval(Duration.ofMillis(4L))
+ .withRolloverInterval(Duration.ofMillis(11L))
.build();
final MethodCallCountingPolicyWrapper<String, String> rollingPolicy =
@@ -87,10 +89,10 @@ public class BucketsRollingPolicyTest {
@Test
public void testDefaultRollingPolicyDeprecatedCreate() throws Exception {
DefaultRollingPolicy policy =
- DefaultRollingPolicy.create()
- .withInactivityInterval(10)
- .withMaxPartSize(20)
- .withRolloverInterval(30)
+ DefaultRollingPolicy.builder()
+ .withInactivityInterval(Duration.ofMillis(10))
+ .withMaxPartSize(new MemorySize(20))
+ .withRolloverInterval(Duration.ofMillis(30))
.build();
Assert.assertEquals(10, policy.getInactivityInterval());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index b01040e..98dd89a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -128,7 +129,7 @@ public class BucketsTest {
final RollingPolicy<String, String> onCheckpointRP =
DefaultRollingPolicy.builder()
- .withMaxPartSize(7L) // roll with 2 elements
+ .withMaxPartSize(new MemorySize(7L)) // roll with 2 elements
.build();
final MockListState<byte[]> bucketStateContainerOne = new MockListState<>();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index f6617d2..b8853e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.Tuple2Encoder;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.TupleToIntegerBucketer;
@@ -34,6 +35,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
+import java.time.Duration;
import java.util.Map;
/** Tests for the {@link StreamingFileSink}. */
@@ -423,9 +425,9 @@ public class LocalStreamingFileSinkTest extends TestLogger {
final long inactivityInterval = 100L;
final RollingPolicy<Tuple2<String, Integer>, Integer> rollingPolicy =
DefaultRollingPolicy.builder()
- .withMaxPartSize(partMaxSize)
- .withRolloverInterval(inactivityInterval)
- .withInactivityInterval(inactivityInterval)
+ .withMaxPartSize(new MemorySize(partMaxSize))
+ .withRolloverInterval(Duration.ofMillis(inactivityInterval))
+ .withInactivityInterval(Duration.ofMillis(inactivityInterval))
.build();
try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index e2143be..aba2801 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -42,6 +43,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -67,9 +69,9 @@ public class TestUtils {
final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy =
DefaultRollingPolicy.builder()
- .withMaxPartSize(partMaxSize)
- .withRolloverInterval(inactivityInterval)
- .withInactivityInterval(inactivityInterval)
+ .withMaxPartSize(new MemorySize(partMaxSize))
+ .withRolloverInterval(Duration.ofMillis(inactivityInterval))
+ .withInactivityInterval(Duration.ofMillis(inactivityInterval))
.build();
return createRescalingTestSink(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
index dc15abf..502cb11 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.streaming.api.functions.source;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -53,7 +54,9 @@ public class ContinuousFileReaderOperatorITCase {
new SimpleStringEncoder<String>())
.withOutputFileConfig(OutputFileConfig.builder().build())
.withRollingPolicy(
- DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024).build())
+ DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .build())
.build();
stream.sinkTo(sink);
env.execute("test");