You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/10 15:42:13 UTC

[incubator-inlong] branch master updated: [INLONG-715] Make shouldRollOnCheckpoint always return true in DefaultRollingPolicy in inlong-sort

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 455b9da  [INLONG-715] Make shouldRollOnCheckpoint always return true in DefaultRollingPolicy in inlong-sort
455b9da is described below

commit 455b9da9326fe7589dcfab4856b586ccc467e247
Author: tianqiwan <ti...@tencent.com>
AuthorDate: Sat Jul 10 23:21:15 2021 +0800

    [INLONG-715] Make shouldRollOnCheckpoint always return true in DefaultRollingPolicy in inlong-sort
---
 .../org/apache/inlong/sort/flink/filesystem/DefaultRollingPolicy.java  | 2 +-
 .../test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java    | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/core/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultRollingPolicy.java b/inlong-sort/core/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultRollingPolicy.java
index 4c1c2ce..1373b78 100644
--- a/inlong-sort/core/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultRollingPolicy.java
+++ b/inlong-sort/core/src/main/java/org/apache/inlong/sort/flink/filesystem/DefaultRollingPolicy.java
@@ -55,7 +55,7 @@ public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<I
 
     @Override
     public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException {
-        return partFileState.getSize() > partSize;
+        return true;
     }
 
     @Override
diff --git a/inlong-sort/core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java b/inlong-sort/core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
index 7970d37..dd6f7dd 100644
--- a/inlong-sort/core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
+++ b/inlong-sort/core/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkITCase.java
@@ -249,8 +249,7 @@ public class HiveSinkITCase extends TestLogger {
         final ExecutorService executorService = Executors.newSingleThreadExecutor();
         executorService.execute(() -> {
             final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-//            env.enableCheckpointing(1000L);
-//            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(UNLIMITED_TOLERABLE_FAILURE_NUMBER);
+            env.enableCheckpointing(1000L);
             env.setRestartStrategy(RestartStrategies.noRestart());
             env.addSource(new TestingSourceFunction())
                     .setParallelism(1)