You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/10 18:00:26 UTC

[flink] branch master updated: [FLINK-24834][connectors / filesystem] Add typed builders to DefaultRollingPolicy

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 059c00f  [FLINK-24834][connectors / filesystem] Add typed builders to DefaultRollingPolicy
059c00f is described below

commit 059c00f4f6bbf0b868d4d8e0971e29d0168894f9
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 8 14:26:59 2021 -0600

    [FLINK-24834][connectors / filesystem] Add typed builders to DefaultRollingPolicy
    
    This closes #17731
---
 .../docs/connectors/datastream/file_sink.md        | 20 ++++++----
 .../docs/connectors/datastream/streamfile_sink.md  |  6 +--
 .../docs/connectors/datastream/file_sink.md        | 20 ++++++----
 .../docs/connectors/datastream/streamfile_sink.md  | 12 +++---
 ...leWriterBucketStateSerializerMigrationTest.java |  3 +-
 .../file/sink/writer/FileWriterBucketTest.java     |  3 +-
 .../connector/file/sink/writer/FileWriterTest.java |  9 ++++-
 .../rollingpolicies/DefaultRollingPolicy.java      | 45 ++++++++++++++++++++++
 .../sink/filesystem/BucketAssignerITCases.java     |  3 +-
 .../sink/filesystem/BucketStateGenerator.java      |  3 +-
 .../sink/filesystem/BucketStateSerializerTest.java |  3 +-
 .../sink/filesystem/BucketsRollingPolicyTest.java  | 16 ++++----
 .../api/functions/sink/filesystem/BucketsTest.java |  3 +-
 .../filesystem/LocalStreamingFileSinkTest.java     |  8 ++--
 .../api/functions/sink/filesystem/TestUtils.java   |  8 ++--
 .../source/ContinuousFileReaderOperatorITCase.java |  5 ++-
 16 files changed, 122 insertions(+), 45 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/file_sink.md b/docs/content.zh/docs/connectors/datastream/file_sink.md
index 329c7cf..406fcb5 100644
--- a/docs/content.zh/docs/connectors/datastream/file_sink.md
+++ b/docs/content.zh/docs/connectors/datastream/file_sink.md
@@ -64,23 +64,26 @@ File Sink 会将数据写入到桶中。由于输入流可能是无界的,因
 字符串元素写入示例:
 
 
-{{< tabs "946da1d5-b046-404e-ab80-a5a5d251d8ee" >}}
+{{< tabs "08046394-3912-497d-ab4b-e07a0ef1f519" >}}
 {{< tab "Java" >}}
 ```java
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 
+import java.time.Duration;
+
 DataStream<String> input = ...;
 
 final FileSink<String> sink = FileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
 	.build();
 
@@ -92,18 +95,21 @@ input.sinkTo(sink);
 ```scala
 import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.core.fs.Path
+import org.apache.flink.configuration.MemorySize
 import org.apache.flink.connector.file.sink.FileSink
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 
+import java.time.Duration
+
 val input: DataStream[String] = ...
 
 val sink: FileSink[String] = FileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
     .build()
 
diff --git a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
index 4561891..83818b3 100644
--- a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
+++ b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
@@ -104,9 +104,9 @@ val sink: StreamingFileSink[String] = StreamingFileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
     .build()
 
diff --git a/docs/content/docs/connectors/datastream/file_sink.md b/docs/content/docs/connectors/datastream/file_sink.md
index 0de512a..5dba048 100644
--- a/docs/content/docs/connectors/datastream/file_sink.md
+++ b/docs/content/docs/connectors/datastream/file_sink.md
@@ -76,7 +76,7 @@ that is used for serializing individual rows to the `OutputStream` of the in-pro
 In addition to the bucket assigner, the RowFormatBuilder allows the user to specify:
 
  - Custom RollingPolicy : Rolling policy to override the DefaultRollingPolicy
- - bucketCheckInterval (default = 1 min) : Millisecond interval for checking time based rolling policies
+ - bucketCheckInterval (default = 1 min) : Interval for checking time based rolling policies
 
 Basic usage for writing String elements thus looks like this:
 
@@ -86,18 +86,21 @@ Basic usage for writing String elements thus looks like this:
 ```java
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 
+import java.time.Duration;
+
 DataStream<String> input = ...;
 
 final FileSink<String> sink = FileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
 	.build();
 
@@ -109,18 +112,21 @@ input.sinkTo(sink);
 ```scala
 import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.core.fs.Path
+import org.apache.flink.configuration.MemorySize
 import org.apache.flink.connector.file.sink.FileSink
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 
+import java.time.Duration
+
 val input: DataStream[String] = ...
 
 val sink: FileSink[String] = FileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
     .build()
 
diff --git a/docs/content/docs/connectors/datastream/streamfile_sink.md b/docs/content/docs/connectors/datastream/streamfile_sink.md
index c008898..65908de 100644
--- a/docs/content/docs/connectors/datastream/streamfile_sink.md
+++ b/docs/content/docs/connectors/datastream/streamfile_sink.md
@@ -91,9 +91,9 @@ final StreamingFileSink<String> sink = StreamingFileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
 	.build();
 
@@ -114,9 +114,9 @@ val sink: StreamingFileSink[String] = StreamingFileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
     .build()
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
index 2b8f1ec..65f15f6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink.writer;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.committer.FileCommitter;
 import org.apache.flink.core.fs.FileSystem;
@@ -257,7 +258,7 @@ public class FileWriterBucketStateSerializerMigrationTest {
             throws IOException {
         return FileWriterBucket.restore(
                 createBucketWriter(),
-                DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+                DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10)).build(),
                 bucketState,
                 OutputFileConfig.builder().build());
     }
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
index 319b9f5..87d8295 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
@@ -46,6 +46,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -200,7 +201,7 @@ public class FileWriterBucketTest {
         Path path = new Path(outDir.toURI());
 
         RollingPolicy<String, String> onProcessingTimeRollingPolicy =
-                DefaultRollingPolicy.builder().withRolloverInterval(10).build();
+                DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMillis(10)).build();
 
         TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
         FileWriterBucket<String> bucket =
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index ab55433..a8fe439 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -50,6 +50,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -219,7 +220,9 @@ public class FileWriterTest {
                 createWriter(
                         path,
                         new FileSinkTestUtils.StringIdentityBucketAssigner(),
-                        DefaultRollingPolicy.builder().withRolloverInterval(10).build(),
+                        DefaultRollingPolicy.builder()
+                                .withRolloverInterval(Duration.ofMillis(10))
+                                .build(),
                         new OutputFileConfig("part-", ""),
                         processingTimeService,
                         5);
@@ -311,7 +314,9 @@ public class FileWriterTest {
                 createWriter(
                         path,
                         new VerifyingBucketAssigner(timestamp, watermark, processingTime),
-                        DefaultRollingPolicy.builder().withRolloverInterval(10).build(),
+                        DefaultRollingPolicy.builder()
+                                .withRolloverInterval(Duration.ofMillis(10))
+                                .build(),
                         new OutputFileConfig("part-", ""),
                         processingTimeService,
                         5);
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index 9ede847..2c5a663 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -19,11 +19,13 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.time.Duration;
 
 /**
  * The default implementation of the {@link RollingPolicy}.
@@ -151,6 +153,18 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
          *
          * @param size the allowed part size.
          */
+        public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final MemorySize size) {
+            Preconditions.checkNotNull(size, "Rolling policy memory size cannot be null");
+            return new PolicyBuilder(size.getBytes(), rolloverInterval, inactivityInterval);
+        }
+
+        /**
+         * Sets the part size above which a part file will have to roll.
+         *
+         * @param size the allowed part size.
+         * @deprecated Use {@link #withMaxPartSize(MemorySize)} instead.
+         */
+        @Deprecated
         public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final long size) {
             Preconditions.checkState(size > 0L);
             return new PolicyBuilder(size, rolloverInterval, inactivityInterval);
@@ -163,25 +177,56 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
          * setting.
          *
          * @param interval the allowed inactivity interval.
+         * @deprecated Use {@link #withInactivityInterval(Duration)} instead.
          */
+        @Deprecated
         public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(final long interval) {
             Preconditions.checkState(interval > 0L);
             return new PolicyBuilder(partSize, rolloverInterval, interval);
         }
 
         /**
+         * Sets the interval of allowed inactivity after which a part file will have to roll. The
+         * frequency at which this is checked is controlled by the {@link
+         * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
+         * setting.
+         *
+         * @param interval the allowed inactivity interval.
+         */
+        public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(final Duration interval) {
+            Preconditions.checkNotNull(
+                    interval, "Rolling policy inactivity interval cannot be null");
+            return new PolicyBuilder(partSize, rolloverInterval, interval.toMillis());
+        }
+
+        /**
          * Sets the max time a part file can stay open before having to roll. The frequency at which
          * this is checked is controlled by the {@link
          * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
          * setting.
          *
          * @param interval the desired rollover interval.
+         * @deprecated Use {@link #withRolloverInterval(Duration)} instead.
          */
+        @Deprecated
         public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(final long interval) {
             Preconditions.checkState(interval > 0L);
             return new PolicyBuilder(partSize, interval, inactivityInterval);
         }
 
+        /**
+         * Sets the max time a part file can stay open before having to roll. The frequency at which
+         * this is checked is controlled by the {@link
+         * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
+         * setting.
+         *
+         * @param interval the desired rollover interval.
+         */
+        public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(final Duration interval) {
+            Preconditions.checkNotNull(interval, "Rolling policy rollover interval cannot be null");
+            return new PolicyBuilder(partSize, interval.toMillis(), inactivityInterval);
+        }
+
         /** Creates the actual policy. */
         public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build() {
             return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index 7d5198f..e12638a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
@@ -43,7 +44,7 @@ public class BucketAssignerITCases {
         final long time = 1000L;
 
         final RollingPolicy<String, String> rollingPolicy =
-                DefaultRollingPolicy.builder().withMaxPartSize(7L).build();
+                DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(7L)).build();
 
         final Buckets<String, String> buckets =
                 new Buckets<>(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
index d75c8f5..5fede59 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -166,7 +167,7 @@ public class BucketStateGenerator {
                 bucketPath,
                 0,
                 createBucketWriter(),
-                DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+                DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10)).build(),
                 null,
                 OutputFileConfig.builder().build());
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 1ace592..c456ba7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -242,7 +243,7 @@ public class BucketStateSerializerTest {
                 0,
                 initialPartCounter,
                 createBucketWriter(),
-                DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
+                DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10)).build(),
                 bucketState,
                 null,
                 OutputFileConfig.builder().build());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java
index c06b6c8..8eefba0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
@@ -32,6 +33,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 
 /** Tests for different {@link RollingPolicy rolling policies}. */
 public class BucketsRollingPolicyTest {
@@ -45,9 +47,9 @@ public class BucketsRollingPolicyTest {
 
         final RollingPolicy<String, String> originalRollingPolicy =
                 DefaultRollingPolicy.builder()
-                        .withMaxPartSize(10L)
-                        .withInactivityInterval(4L)
-                        .withRolloverInterval(11L)
+                        .withMaxPartSize(new MemorySize(10L))
+                        .withInactivityInterval(Duration.ofMillis(4L))
+                        .withRolloverInterval(Duration.ofMillis(11L))
                         .build();
 
         final MethodCallCountingPolicyWrapper<String, String> rollingPolicy =
@@ -87,10 +89,10 @@ public class BucketsRollingPolicyTest {
     @Test
     public void testDefaultRollingPolicyDeprecatedCreate() throws Exception {
         DefaultRollingPolicy policy =
-                DefaultRollingPolicy.create()
-                        .withInactivityInterval(10)
-                        .withMaxPartSize(20)
-                        .withRolloverInterval(30)
+                DefaultRollingPolicy.builder()
+                        .withInactivityInterval(Duration.ofMillis(10))
+                        .withMaxPartSize(new MemorySize(20))
+                        .withRolloverInterval(Duration.ofMillis(30))
                         .build();
 
         Assert.assertEquals(10, policy.getInactivityInterval());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index b01040e..98dd89a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -128,7 +129,7 @@ public class BucketsTest {
 
         final RollingPolicy<String, String> onCheckpointRP =
                 DefaultRollingPolicy.builder()
-                        .withMaxPartSize(7L) // roll with 2 elements
+                        .withMaxPartSize(new MemorySize(7L)) // roll with 2 elements
                         .build();
 
         final MockListState<byte[]> bucketStateContainerOne = new MockListState<>();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index f6617d2..b8853e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.Tuple2Encoder;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.TupleToIntegerBucketer;
@@ -34,6 +35,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Map;
 
 /** Tests for the {@link StreamingFileSink}. */
@@ -423,9 +425,9 @@ public class LocalStreamingFileSinkTest extends TestLogger {
         final long inactivityInterval = 100L;
         final RollingPolicy<Tuple2<String, Integer>, Integer> rollingPolicy =
                 DefaultRollingPolicy.builder()
-                        .withMaxPartSize(partMaxSize)
-                        .withRolloverInterval(inactivityInterval)
-                        .withInactivityInterval(inactivityInterval)
+                        .withMaxPartSize(new MemorySize(partMaxSize))
+                        .withRolloverInterval(Duration.ofMillis(inactivityInterval))
+                        .withInactivityInterval(Duration.ofMillis(inactivityInterval))
                         .build();
 
         try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index e2143be..aba2801 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -42,6 +43,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -67,9 +69,9 @@ public class TestUtils {
 
         final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy =
                 DefaultRollingPolicy.builder()
-                        .withMaxPartSize(partMaxSize)
-                        .withRolloverInterval(inactivityInterval)
-                        .withInactivityInterval(inactivityInterval)
+                        .withMaxPartSize(new MemorySize(partMaxSize))
+                        .withRolloverInterval(Duration.ofMillis(inactivityInterval))
+                        .withInactivityInterval(Duration.ofMillis(inactivityInterval))
                         .build();
 
         return createRescalingTestSink(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
index dc15abf..502cb11 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/functions/source/ContinuousFileReaderOperatorITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.test.streaming.api.functions.source;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -53,7 +54,9 @@ public class ContinuousFileReaderOperatorITCase {
                                 new SimpleStringEncoder<String>())
                         .withOutputFileConfig(OutputFileConfig.builder().build())
                         .withRollingPolicy(
-                                DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024).build())
+                                DefaultRollingPolicy.builder()
+                                        .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                        .build())
                         .build();
         stream.sinkTo(sink);
         env.execute("test");