You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/16 13:07:04 UTC
[flink] branch master updated: [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3c98598 [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase
3c98598 is described below
commit 3c98598c5a715563602993121cb808f1b239c89c
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Mar 14 18:23:25 2022 +0100
[FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase
---
.../operators/lifecycle/BoundedSourceITCase.java | 37 ++++++++++++++++++++--
1 file changed, 34 insertions(+), 3 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
index d03e9cb..6933fe6 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
@@ -17,19 +17,27 @@
package org.apache.flink.runtime.operators.lifecycle;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.TestingGraphBuilder;
import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.SharedObjects;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
+import java.io.IOException;
+
import static org.apache.flink.runtime.operators.lifecycle.command.TestCommand.FINISH_SOURCES;
import static org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS;
import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.COMPLEX_GRAPH_BUILDER;
@@ -46,10 +54,33 @@ import static org.apache.flink.runtime.operators.lifecycle.validation.TestOperat
* same.
*/
@RunWith(Parameterized.class)
-public class BoundedSourceITCase extends AbstractTestBase {
+public class BoundedSourceITCase extends TestBaseUtils {
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(configuration())
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(4)
+ .build());
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
+ private static Configuration configuration() {
+ Configuration conf = new Configuration();
+
+ try {
+ FsStateChangelogStorageFactory.configure(conf, TEMPORARY_FOLDER.newFolder());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return conf;
+ }
+
@Parameter public TestingGraphBuilder graphBuilder;
@Parameterized.Parameters(name = "{0}")
@@ -68,7 +99,7 @@ public class BoundedSourceITCase extends AbstractTestBase {
.setCheckpointStorage(
TEMPORARY_FOLDER.newFolder().toURI()));
- TestJobExecutor.execute(testJob, MINI_CLUSTER_RESOURCE)
+ TestJobExecutor.execute(testJob, miniClusterResource)
.waitForEvent(CheckpointCompletedEvent.class)
.sendBroadcastCommand(FINISH_SOURCES, ALL_SUBTASKS)
.waitForTermination()