You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/08 10:42:49 UTC

[flink] branch release-1.12 updated: [FLINK-21155][tests] Fix FileSourceTextLinesITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new cbcb7c1  [FLINK-21155][tests] Fix FileSourceTextLinesITCase
cbcb7c1 is described below

commit cbcb7c17459b57c614ecc81bef6b574f7611c61d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Feb 3 19:03:11 2021 +0100

    [FLINK-21155][tests] Fix FileSourceTextLinesITCase
    
    The problem of the test case was that it is reusing the MiniCluster. Moreover, it kills for every
    TaskManager failure the TaskManager at the index 0. Since this is only possible once, all preceding
    TaskManager failure test will succeed because not TaskManager is killed.
    
    This commit fixes the problem by not reusing the MiniCluster across multiple tests.
    
    This closes #14856.
---
 .../file/src/FileSourceTextLinesITCase.java        | 37 ++++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 31a58a9..09e6539 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -20,9 +20,11 @@ package org.apache.flink.connector.file.src;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,6 +36,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -65,8 +68,8 @@ public class FileSourceTextLinesITCase extends TestLogger {
 
     @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
@@ -119,6 +122,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(PARALLELISM);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 
         final DataStream<String> stream =
                 env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
@@ -132,7 +136,11 @@ public class FileSourceTextLinesITCase extends TestLogger {
         final JobID jobId = client.client.getJobID();
 
         RecordCounterToFail.waitToFail();
-        triggerFailover(failoverType, jobId, RecordCounterToFail::continueProcessing);
+        triggerFailover(
+                failoverType,
+                jobId,
+                RecordCounterToFail::continueProcessing,
+                miniClusterResource.getMiniCluster());
 
         final List<String> result = new ArrayList<>();
         while (client.iterator.hasNext()) {
@@ -205,7 +213,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
             writeFile(testDir, i);
             final boolean failAfterHalfOfInput = i == LINES_PER_FILE.length / 2;
             if (failAfterHalfOfInput) {
-                triggerFailover(type, jobId, () -> {});
+                triggerFailover(type, jobId, () -> {}, miniClusterResource.getMiniCluster());
             }
         }
 
@@ -229,34 +237,35 @@ public class FileSourceTextLinesITCase extends TestLogger {
         JM
     }
 
-    private static void triggerFailover(FailoverType type, JobID jobId, Runnable afterFailAction)
+    private static void triggerFailover(
+            FailoverType type, JobID jobId, Runnable afterFailAction, MiniCluster miniCluster)
             throws Exception {
         switch (type) {
             case NONE:
                 afterFailAction.run();
                 break;
             case TM:
-                restartTaskManager(afterFailAction);
+                restartTaskManager(afterFailAction, miniCluster);
                 break;
             case JM:
-                triggerJobManagerFailover(jobId, afterFailAction);
+                triggerJobManagerFailover(jobId, afterFailAction, miniCluster);
                 break;
         }
     }
 
-    private static void triggerJobManagerFailover(JobID jobId, Runnable afterFailAction)
-            throws Exception {
-        final HaLeadershipControl haLeadershipControl =
-                MINI_CLUSTER_RESOURCE.getMiniCluster().getHaLeadershipControl().get();
+    private static void triggerJobManagerFailover(
+            JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
+        final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
         haLeadershipControl.revokeJobMasterLeadership(jobId).get();
         afterFailAction.run();
         haLeadershipControl.grantJobMasterLeadership(jobId).get();
     }
 
-    private static void restartTaskManager(Runnable afterFailAction) throws Exception {
-        MINI_CLUSTER_RESOURCE.getMiniCluster().terminateTaskManager(0).get();
+    private static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster)
+            throws Exception {
+        miniCluster.terminateTaskManager(0).get();
         afterFailAction.run();
-        MINI_CLUSTER_RESOURCE.getMiniCluster().startTaskManager();
+        miniCluster.startTaskManager();
     }
 
     // ------------------------------------------------------------------------