You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/05/13 14:50:18 UTC

[flink] branch release-1.13 updated (cfdf001 -> 25489aa)

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cfdf001  [FLINK-22618][runtime] Fix incorrect free resource metrics of task managers
     new 51680fa  [hotfix] Add missing TestLogger to Kinesis tests
     new 25489aa  [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kinesis/FlinkKinesisITCase.java     |  3 +-
 .../kinesis/FlinkKinesisProducerTest.java          |  3 +-
 .../connectors/kinesis/KinesisConsumerTest.java    |  3 +-
 .../adaptive/StateWithExecutionGraph.java          |  2 -
 .../adaptive/AdaptiveSchedulerSimpleITCase.java    | 44 ++++++++++++++++++++++
 5 files changed, 50 insertions(+), 5 deletions(-)

[flink] 01/02: [hotfix] Add missing TestLogger to Kinesis tests

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51680fadfd3fe6cfe63e6c89000a9b3fdaae0353
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon May 10 15:46:21 2021 +0200

    [hotfix] Add missing TestLogger to Kinesis tests
---
 .../apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java  | 3 ++-
 .../flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java   | 3 ++-
 .../apache/flink/streaming/connectors/kinesis/KinesisConsumerTest.java | 3 ++-
 3 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 0b224c3..3db1dd3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -49,7 +50,7 @@ import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
 
 /** IT cases for using Kinesis consumer/producer based on Kinesalite. */
-public class FlinkKinesisITCase {
+public class FlinkKinesisITCase extends TestLogger {
     public static final String TEST_STREAM = "test_stream";
 
     @ClassRule
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
index d5643ec..387d2d3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.util.MockSerializationSchema;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
 
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
@@ -63,7 +64,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /** Suite of {@link FlinkKinesisProducer} tests. */
-public class FlinkKinesisProducerTest {
+public class FlinkKinesisProducerTest extends TestLogger {
 
     @Rule public ExpectedException exception = ExpectedException.none();
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/KinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/KinesisConsumerTest.java
index 2d695ab..6ebe4a4 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/KinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/KinesisConsumerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kinesis;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -33,7 +34,7 @@ import java.util.Properties;
  * Tests for {@link FlinkKinesisConsumer}. In contrast to tests in {@link FlinkKinesisConsumerTest}
  * it does not use power mock, which makes it possible to use e.g. the {@link ExpectedException}.
  */
-public class KinesisConsumerTest {
+public class KinesisConsumerTest extends TestLogger {
 
     @Rule public ExpectedException thrown = ExpectedException.none();
 

[flink] 02/02: [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state.

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25489aa0cc536f1551a0a887bdfc690824a24bd9
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Fri May 7 08:35:04 2021 +0200

    [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state.
    
    The Canceling state of Adaptive Scheduler was expecting the ExecutionGraph to be in state RUNNING when entering the state.
    However, the Restarting state is cancelling the ExecutionGraph already, thus the ExectionGraph can be in state CANCELING or CANCELED when entering the Canceling state.
    
    Calling the ExecutionGraph.cancel() method in the Canceling state while being in ExecutionGraph.state = CANCELED || CANCELLED is not a problem.
    
    The change is guarded by a new ITCase, as this issue affects the interplay between different AS states.
---
 .../adaptive/StateWithExecutionGraph.java          |  2 -
 .../adaptive/AdaptiveSchedulerSimpleITCase.java    | 44 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 9962c78..91d688f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -91,8 +91,6 @@ abstract class StateWithExecutionGraph implements State {
         this.operatorCoordinatorHandler = operatorCoordinatorHandler;
         this.kvStateHandler = new KvStateHandler(executionGraph);
         this.logger = logger;
-        Preconditions.checkState(
-                executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
 
         FutureUtils.assertNoException(
                 executionGraph
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
index 5c14764..3155ab7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -34,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -44,6 +47,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
@@ -110,6 +114,34 @@ public class AdaptiveSchedulerSimpleITCase extends TestLogger {
     }
 
     @Test
+    public void testJobCancellationWhileRestartingSucceeds() throws Exception {
+        final long timeInRestartingState = 10000L;
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
+        alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
+        alwaysFailingOperator.setParallelism(1);
+
+        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        // configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
+        executionConfig.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
+        jobGraph.setExecutionConfig(executionConfig);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        // wait until we are in RESTARTING state
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING,
+                Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
+                5);
+
+        // now cancel while in RESTARTING state
+        miniCluster.cancelJob(jobGraph.getJobID()).get();
+    }
+
+    @Test
     public void testGlobalFailoverIfTaskFails() throws Throwable {
         assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
 
@@ -167,4 +199,16 @@ public class AdaptiveSchedulerSimpleITCase extends TestLogger {
             hasFailed = false;
         }
     }
+
+    /** Always failing {@link AbstractInvokable}. */
+    public static final class AlwaysFailingInvokable extends AbstractInvokable {
+        public AlwaysFailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            throw new FlinkRuntimeException("Test failure.");
+        }
+    }
 }