You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/16 13:07:04 UTC

[flink] branch master updated: [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c98598  [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase
3c98598 is described below

commit 3c98598c5a715563602993121cb808f1b239c89c
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Mar 14 18:23:25 2022 +0100

    [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase
---
 .../operators/lifecycle/BoundedSourceITCase.java   | 37 ++++++++++++++++++++--
 1 file changed, 34 insertions(+), 3 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
index d03e9cb..6933fe6 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
@@ -17,19 +17,27 @@
 
 package org.apache.flink.runtime.operators.lifecycle;
 
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
 import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.TestingGraphBuilder;
 import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
 import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.testutils.junit.SharedObjects;
 
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 
+import java.io.IOException;
+
 import static org.apache.flink.runtime.operators.lifecycle.command.TestCommand.FINISH_SOURCES;
 import static org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS;
 import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.COMPLEX_GRAPH_BUILDER;
@@ -46,10 +54,33 @@ import static org.apache.flink.runtime.operators.lifecycle.validation.TestOperat
  * same.
  */
 @RunWith(Parameterized.class)
-public class BoundedSourceITCase extends AbstractTestBase {
+public class BoundedSourceITCase extends TestBaseUtils {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(configuration())
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
 
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
 
+    private static Configuration configuration() {
+        Configuration conf = new Configuration();
+
+        try {
+            FsStateChangelogStorageFactory.configure(conf, TEMPORARY_FOLDER.newFolder());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return conf;
+    }
+
     @Parameter public TestingGraphBuilder graphBuilder;
 
     @Parameterized.Parameters(name = "{0}")
@@ -68,7 +99,7 @@ public class BoundedSourceITCase extends AbstractTestBase {
                                         .setCheckpointStorage(
                                                 TEMPORARY_FOLDER.newFolder().toURI()));
 
-        TestJobExecutor.execute(testJob, MINI_CLUSTER_RESOURCE)
+        TestJobExecutor.execute(testJob, miniClusterResource)
                 .waitForEvent(CheckpointCompletedEvent.class)
                 .sendBroadcastCommand(FINISH_SOURCES, ALL_SUBTASKS)
                 .waitForTermination()