You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/03/23 18:12:03 UTC
[1/7] flink git commit: [FLINK-8964][tests] Port
JobSubmissionFailsITCase to flip6
Repository: flink
Updated Branches:
refs/heads/master 9ee02f6a3 -> 2ac3474c4
[FLINK-8964][tests] Port JobSubmissionFailsITCase to flip6
This closes #5727.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adeff926
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adeff926
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adeff926
Branch: refs/heads/master
Commit: adeff9267ff23ebd14de39533341713241f25dfb
Parents: 0ce8574
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 11:40:48 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100
----------------------------------------------------------------------
.../failing/JobSubmissionFailsITCase.java | 169 ++++++-------------
1 file changed, 55 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/adeff926/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index a647af9..ecd16a1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -19,30 +19,25 @@
package org.apache.flink.test.example.failing;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Optional;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -51,47 +46,32 @@ import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class JobSubmissionFailsITCase extends TestLogger {
+ private static final int NUM_TM = 2;
private static final int NUM_SLOTS = 20;
- private static LocalFlinkMiniCluster cluster;
- private static JobGraph workingJobGraph;
-
- @BeforeClass
- public static void setup() {
- try {
- Configuration config = new Configuration();
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
-
- cluster = new LocalFlinkMiniCluster(config);
-
- cluster.start();
-
- final JobVertex jobVertex = new JobVertex("Working job vertex.");
- jobVertex.setInvokableClass(NoOpInvokable.class);
- workingJobGraph = new JobGraph("Working testing job", jobVertex);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ NUM_TM,
+ NUM_SLOTS / NUM_TM),
+ true);
+
+ private static Configuration getConfiguration() {
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+ return config;
}
- @AfterClass
- public static void teardown() {
- try {
- cluster.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ private static JobGraph getWorkingJobGraph() {
+ final JobVertex jobVertex = new JobVertex("Working job vertex.");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+ return new JobGraph("Working testing job", jobVertex);
}
// --------------------------------------------------------------------------------------------
- private boolean detached;
+ private final boolean detached;
public JobSubmissionFailsITCase(boolean detached) {
this.detached = detached;
@@ -105,90 +85,51 @@ public class JobSubmissionFailsITCase extends TestLogger {
// --------------------------------------------------------------------------------------------
- private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
- if (detached) {
- cluster.submitJobDetached(jobGraph);
- return null;
- }
- else {
- return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
- }
- }
-
@Test
- public void testExceptionInInitializeOnMaster() {
- try {
- final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
- failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
- final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
+ public void testExceptionInInitializeOnMaster() throws Exception {
+ final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
+ failingJobVertex.setInvokableClass(NoOpInvokable.class);
- try {
- submitJob(failingJobGraph);
- fail("Expected JobExecutionException.");
- }
- catch (JobExecutionException e) {
- assertEquals("Test exception.", e.getCause().getMessage());
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
+ final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+ client.setDetached(detached);
- @Test
- public void testSubmitEmptyJobGraph() {
try {
- final JobGraph jobGraph = new JobGraph("Testing job");
-
- try {
- submitJob(jobGraph);
- fail("Expected JobSubmissionException.");
- }
- catch (JobSubmissionException e) {
- assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
+ client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+ fail("Job submission should have thrown an exception.");
+ } catch (Exception e) {
+ Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
+ candidate -> "Test exception.".equals(candidate.getMessage()));
+ if (!expectedCause.isPresent()) {
+ throw e;
}
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
-
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
+
+ client.setDetached(false);
+ client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
}
@Test
- public void testSubmitNullJobGraph() {
+ public void testSubmitEmptyJobGraph() throws Exception {
+ final JobGraph jobGraph = new JobGraph("Testing job");
+
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+ client.setDetached(detached);
+
try {
- try {
- submitJob(null);
- fail("Expected JobSubmissionException.");
- }
- catch (NullPointerException e) {
- // yo!
+ client.submitJob(jobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+ fail("Job submission should have thrown an exception.");
+ } catch (Exception e) {
+ Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
+ throwable -> throwable.getMessage() != null && throwable.getMessage().contains("empty"));
+ if (!expectedCause.isPresent()) {
+ throw e;
}
- catch (Throwable t) {
- t.printStackTrace();
- fail("Caught wrong exception of type " + t.getClass() + ".");
- }
-
- cluster.submitJobAndWait(workingJobGraph, false);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
+
+ client.setDetached(false);
+ client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
}
// --------------------------------------------------------------------------------------------
[5/7] flink git commit: [FLINK-8957][tests] Port
JMXJobManagerMetricTest to flip6
Posted by ch...@apache.org.
[FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6
This closes #5720.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0623e24c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0623e24c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0623e24c
Branch: refs/heads/master
Commit: 0623e24c8814e073426062e8b27bf88e664ee3aa
Parents: 4f5488c
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 14:17:34 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100
----------------------------------------------------------------------
flink-metrics/flink-metrics-jmx/pom.xml | 6 ++
.../jobmanager/JMXJobManagerMetricTest.java | 69 +++++++++++---------
2 files changed, 44 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0623e24c/flink-metrics/flink-metrics-jmx/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/pom.xml b/flink-metrics/flink-metrics-jmx/pom.xml
index d738a7e..123f8e7 100644
--- a/flink-metrics/flink-metrics-jmx/pom.xml
+++ b/flink-metrics/flink-metrics-jmx/pom.xml
@@ -85,5 +85,11 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/0623e24c/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 6770ec3..c00b5d3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -18,38 +18,40 @@
package org.apache.flink.runtime.jobmanager;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.MiniClusterResource;
import org.junit.Assert;
+import org.junit.ClassRule;
import org.junit.Test;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
+import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.junit.Assert.assertEquals;
/**
@@ -57,24 +59,31 @@ import static org.junit.Assert.assertEquals;
*/
public class JMXJobManagerMetricTest {
- /**
- * Tests that metrics registered on the JobManager are actually accessible via JMX.
- */
- @Test
- public void testJobManagerJMXMetricAccess() throws Exception {
- Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 1,
+ 1),
+ true);
+
+ private static Configuration getConfiguration() {
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
- flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
-
flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
- TestingCluster flink = new TestingCluster(flinkConfiguration);
+ return flinkConfiguration;
+ }
- try {
- flink.start();
+ /**
+ * Tests that metrics registered on the JobManager are actually accessible via JMX.
+ */
+ @Test
+ public void testJobManagerJMXMetricAccess() throws Exception {
+ Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
+ try {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
@@ -92,28 +101,26 @@ public class JMXJobManagerMetricTest {
true),
null));
- flink.waitForActorsToBeAlive();
-
- flink.submitJobDetached(jobGraph);
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+ client.setDetached(true);
+ client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader());
- Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft());
- Await.ready(jobRunning, deadline.timeLeft());
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> client.getJobStatus(jobGraph.getJobID()),
+ Time.milliseconds(10),
+ deadline,
+ status -> status == JobStatus.RUNNING,
+ TestingUtils.defaultScheduledExecutor()
+ ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
Assert.assertEquals(1, nameSet.size());
assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
- Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft())
- .ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
-
BlockingInvokable.unblock();
-
- // wait til the job has finished
- Await.ready(jobFinished, deadline.timeLeft());
} finally {
- flink.stop();
+ BlockingInvokable.unblock();
}
}
[2/7] flink git commit: [hotfix][utils] Add
ExceptionUtils#findThrowable with predicate
Posted by ch...@apache.org.
[hotfix][utils] Add ExceptionUtils#findThrowable with predicate
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ce85746
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ce85746
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ce85746
Branch: refs/heads/master
Commit: 0ce8574603aac784da30a354f608ff939e09ef58
Parents: 0c56e19
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 11:38:24 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ce85746/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6af16fc..459648f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -35,6 +35,7 @@ import java.io.StringWriter;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -326,6 +327,30 @@ public final class ExceptionUtils {
}
/**
+ * Checks whether a throwable chain contains an exception matching a predicate and returns it.
+ *
+ * @param throwable the throwable chain to check.
+ * @param predicate the predicate of the exception to search for in the chain.
+ * @return Optional throwable of the requested type if available, otherwise empty
+ */
+ public static Optional<Throwable> findThrowable(Throwable throwable, Predicate<Throwable> predicate) {
+ if (throwable == null || predicate == null) {
+ return Optional.empty();
+ }
+
+ Throwable t = throwable;
+ while (t != null) {
+ if (predicate.test(t)) {
+ return Optional.of(t);
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
* Checks whether a throwable chain contains a specific error message and returns the corresponding throwable.
*
* @param throwable the throwable chain to check.
[7/7] flink git commit: [FLINK-8965][tests] Port TimestampITCase to
flip6
Posted by ch...@apache.org.
[FLINK-8965][tests] Port TimestampITCase to flip6
This closes #5728.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac3474c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac3474c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac3474c
Branch: refs/heads/master
Commit: 2ac3474c4482b0366a905bce345dc9f90e64ba2f
Parents: adeff92
Author: zentol <ch...@apache.org>
Authored: Mon Feb 26 17:19:15 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:50 2018 +0100
----------------------------------------------------------------------
.../test/streaming/runtime/TimestampITCase.java | 73 ++++++++++----------
1 file changed, 38 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ac3474c/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 5e08e8a..3b46c82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,17 +45,18 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -75,34 +76,24 @@ public class TimestampITCase extends TestLogger {
// this is used in some tests to synchronize
static MultiShotLatch latch;
- private static LocalFlinkMiniCluster cluster;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ NUM_TASK_MANAGERS,
+ NUM_TASK_SLOTS),
+ true);
- @Before
- public void setupLatch() {
- // ensure that we get a fresh latch for each test
- latch = new MultiShotLatch();
- }
-
- @BeforeClass
- public static void startCluster() {
+ private static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-
- cluster = new LocalFlinkMiniCluster(config, false);
-
- cluster.start();
-
- TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+ return config;
}
- @AfterClass
- public static void shutdownCluster() {
- cluster.stop();
- cluster = null;
-
- TestStreamEnvironment.unsetAsContext();
+ @Before
+ public void setupLatch() {
+ // ensure that we get a fresh latch for each test
+ latch = new MultiShotLatch();
}
/**
@@ -162,7 +153,8 @@ public class TimestampITCase extends TestLogger {
public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
// for this test to work, we need to be sure that no other jobs are being executed
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ final ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+ while (!getRunningJobs(clusterClient).isEmpty()) {
Thread.sleep(100);
}
@@ -185,14 +177,15 @@ public class TimestampITCase extends TestLogger {
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.addSink(new DiscardingSink<Integer>());
- new Thread("stopper") {
+ Thread t = new Thread("stopper") {
@Override
public void run() {
try {
// try until we get the running jobs
- List<JobID> running;
- while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
+ List<JobID> running = getRunningJobs(clusterClient);
+ while (running.isEmpty()) {
Thread.sleep(10);
+ running = getRunningJobs(clusterClient);
}
JobID id = running.get(0);
@@ -200,7 +193,7 @@ public class TimestampITCase extends TestLogger {
// send stop until the job is stopped
do {
try {
- cluster.stopJob(id);
+ clusterClient.stop(id);
}
catch (Exception e) {
if (e.getCause() instanceof IllegalStateException) {
@@ -214,13 +207,14 @@ public class TimestampITCase extends TestLogger {
}
Thread.sleep(10);
}
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
+ while (!getRunningJobs(clusterClient).isEmpty());
}
catch (Throwable t) {
t.printStackTrace();
}
}
- }.start();
+ };
+ t.start();
env.execute();
@@ -246,6 +240,7 @@ public class TimestampITCase extends TestLogger {
subtaskWatermarks.get(subtaskWatermarks.size() - 1));
}
}
+ t.join();
}
/**
@@ -855,4 +850,12 @@ public class TimestampITCase extends TestLogger {
@Override
public void cancel() {}
}
+
+ private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
}
[4/7] flink git commit: [FLINK-8956][tests] Port RescalingITCase to
flip6
Posted by ch...@apache.org.
[FLINK-8956][tests] Port RescalingITCase to flip6
This closes #5715.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edb6f7fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edb6f7fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edb6f7fe
Branch: refs/heads/master
Commit: edb6f7fef8c5df6af43bbe28f96d8c6bb3332d00
Parents: 9ee02f6
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 11:36:39 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100
----------------------------------------------------------------------
.../test/checkpointing/RescalingITCase.java | 282 +++++++------------
1 file changed, 97 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edb6f7fe/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index a23c679..e4f4389 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -25,23 +25,24 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -50,7 +51,9 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -62,25 +65,20 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* Test savepoint rescaling.
@@ -106,7 +104,7 @@ public class RescalingITCase extends TestLogger {
NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED
}
- private static TestingCluster cluster;
+ private static MiniClusterResource cluster;
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -120,8 +118,6 @@ public class RescalingITCase extends TestLogger {
currentBackend = backend;
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
final File checkpointDir = temporaryFolder.newFolder();
final File savepointDir = temporaryFolder.newFolder();
@@ -130,15 +126,20 @@ public class RescalingITCase extends TestLogger {
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
- cluster = new TestingCluster(config);
- cluster.start();
+ cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ config,
+ numTaskManagers,
+ slotsPerTaskManager),
+ true);
+ cluster.before();
}
}
@AfterClass
public static void shutDownExistingCluster() {
if (cluster != null) {
- cluster.stop();
+ cluster.after();
cluster = null;
}
}
@@ -175,20 +176,18 @@ public class RescalingITCase extends TestLogger {
final int parallelism2 = scaleOut ? numSlots : numSlots / 2;
final int maxParallelism = 13;
- FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
- Deadline deadline = timeout.fromNow();
+ Duration timeout = Duration.ofMinutes(3);
+ Deadline deadline = Deadline.now().plus(timeout);
- ActorGateway jobManager = null;
- JobID jobID = null;
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- jobManager = cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = createJobGraphWithKeyedState(parallelism, maxParallelism, numberKeys, numberElements, false, 100);
- jobID = jobGraph.getJobID();
+ final JobID jobID = jobGraph.getJobID();
- cluster.submitJobDetached(jobGraph);
+ client.setDetached(true);
+ client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
// wait til the sources have emitted numberElements for each key and completed a checkpoint
SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
@@ -210,22 +209,15 @@ public class RescalingITCase extends TestLogger {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-
- final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)
- Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
+ CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
- Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
+ final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
+ client.cancel(jobID);
- assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
-
- Await.ready(jobRemovedFuture, deadline.timeLeft());
-
- jobID = null;
+ while (!getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
int restoreMaxParallelism = deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET : maxParallelism;
@@ -233,11 +225,8 @@ public class RescalingITCase extends TestLogger {
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- jobID = scaledJobGraph.getJobID();
-
- cluster.submitJobAndWait(scaledJobGraph, false);
-
- jobID = null;
+ client.setDetached(false);
+ client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
@@ -253,17 +242,6 @@ public class RescalingITCase extends TestLogger {
} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
-
- // clear any left overs from a possibly failed job
- if (jobID != null && jobManager != null) {
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
- try {
- Await.ready(jobRemovedFuture, timeout);
- } catch (TimeoutException | InterruptedException ie) {
- fail("Failed while cleaning up the cluster.");
- }
- }
}
}
@@ -279,57 +257,39 @@ public class RescalingITCase extends TestLogger {
final int parallelism2 = numSlots;
final int maxParallelism = 13;
- FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
- Deadline deadline = timeout.fromNow();
+ Duration timeout = Duration.ofMinutes(3);
+ Deadline deadline = Deadline.now().plus(timeout);
- JobID jobID = null;
- ActorGateway jobManager = null;
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- jobManager = cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, OperatorCheckpointMethod.NON_PARTITIONED);
- jobID = jobGraph.getJobID();
-
- cluster.submitJobDetached(jobGraph);
+ final JobID jobID = jobGraph.getJobID();
- Object savepointResponse = null;
+ client.setDetached(true);
+ client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
- Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
- FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
- savepointResponse = Await.result(savepointPathFuture, waitingTime);
+ CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
- assertTrue(String.valueOf(savepointResponse), savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
+ final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath();
+ client.cancel(jobID);
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
- Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
- Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
-
- assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
-
- Await.ready(jobRemovedFuture, deadline.timeLeft());
+ while (!getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
// job successfully removed
- jobID = null;
-
JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, OperatorCheckpointMethod.NON_PARTITIONED);
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- jobID = scaledJobGraph.getJobID();
-
- cluster.submitJobAndWait(scaledJobGraph, false);
-
- jobID = null;
-
+ client.setDetached(false);
+ client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
} catch (JobExecutionException exception) {
if (exception.getCause() instanceof IllegalStateException) {
// we expect a IllegalStateException wrapped
@@ -338,17 +298,6 @@ public class RescalingITCase extends TestLogger {
} else {
throw exception;
}
- } finally {
- // clear any left overs from a possibly failed job
- if (jobID != null && jobManager != null) {
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
- try {
- Await.ready(jobRemovedFuture, timeout);
- } catch (TimeoutException | InterruptedException ie) {
- fail("Failed while cleaning up the cluster.");
- }
- }
}
}
@@ -367,14 +316,12 @@ public class RescalingITCase extends TestLogger {
int parallelism2 = numSlots;
int maxParallelism = 13;
- FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
- Deadline deadline = timeout.fromNow();
+ Duration timeout = Duration.ofMinutes(3);
+ Deadline deadline = Deadline.now().plus(timeout);
- ActorGateway jobManager = null;
- JobID jobID = null;
+ ClusterClient<?> client = cluster.getClusterClient();
try {
- jobManager = cluster.getLeaderGateway(deadline.timeLeft());
JobGraph jobGraph = createJobGraphWithKeyedAndNonPartitionedOperatorState(
parallelism,
@@ -385,9 +332,10 @@ public class RescalingITCase extends TestLogger {
false,
100);
- jobID = jobGraph.getJobID();
+ final JobID jobID = jobGraph.getJobID();
- cluster.submitJobDetached(jobGraph);
+ client.setDetached(true);
+ client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
// wait til the sources have emitted numberElements for each key and completed a checkpoint
SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
@@ -409,22 +357,15 @@ public class RescalingITCase extends TestLogger {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-
- final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)
- Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
- Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
+ CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
- Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
+ final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
+ client.cancel(jobID);
- Await.ready(jobRemovedFuture, deadline.timeLeft());
-
- jobID = null;
+ while (!getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
JobGraph scaledJobGraph = createJobGraphWithKeyedAndNonPartitionedOperatorState(
parallelism2,
@@ -437,11 +378,8 @@ public class RescalingITCase extends TestLogger {
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- jobID = scaledJobGraph.getJobID();
-
- cluster.submitJobAndWait(scaledJobGraph, false);
-
- jobID = null;
+ client.setDetached(false);
+ client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
@@ -457,17 +395,6 @@ public class RescalingITCase extends TestLogger {
} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
-
- // clear any left overs from a possibly failed job
- if (jobID != null && jobManager != null) {
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
- try {
- Await.ready(jobRemovedFuture, timeout);
- } catch (TimeoutException | InterruptedException ie) {
- fail("Failed while cleaning up the cluster.");
- }
- }
}
}
@@ -510,11 +437,10 @@ public class RescalingITCase extends TestLogger {
final int parallelism2 = scaleOut ? numSlots / 2 : numSlots;
final int maxParallelism = 13;
- FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
- Deadline deadline = timeout.fromNow();
+ Duration timeout = Duration.ofMinutes(3);
+ Deadline deadline = Deadline.now().plus(timeout);
- JobID jobID = null;
- ActorGateway jobManager = null;
+ ClusterClient<?> client = cluster.getClusterClient();
int counterSize = Math.max(parallelism, parallelism2);
@@ -528,54 +454,44 @@ public class RescalingITCase extends TestLogger {
}
try {
- jobManager = cluster.getLeaderGateway(deadline.timeLeft());
-
JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
- jobID = jobGraph.getJobID();
-
- cluster.submitJobDetached(jobGraph);
+ final JobID jobID = jobGraph.getJobID();
- Object savepointResponse = null;
+ client.setDetached(true);
+ client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
// wait until the operator is started
StateSourceBase.workStartedLatch.await();
- while (deadline.hasTimeLeft()) {
-
- Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
- FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
- savepointResponse = Await.result(savepointPathFuture, waitingTime);
-
- if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) {
- break;
- }
- }
-
- assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
-
- final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath();
-
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
- Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
- Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
+ CompletableFuture<String> savepointPathFuture = FutureUtils.retryWithDelay(
+ () -> {
+ try {
+ return client.triggerSavepoint(jobID, null);
+ } catch (FlinkException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ },
+ (int) deadline.timeLeft().getSeconds() / 10,
+ Time.seconds(10),
+ (throwable) -> true,
+ TestingUtils.defaultScheduledExecutor()
+ );
- assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
+ final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- Await.ready(jobRemovedFuture, deadline.timeLeft());
+ client.cancel(jobID);
- // job successfully removed
- jobID = null;
+ while (!getRunningJobs(client).isEmpty()) {
+ Thread.sleep(50);
+ }
JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, checkpointMethod);
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
- jobID = scaledJobGraph.getJobID();
-
- cluster.submitJobAndWait(scaledJobGraph, false);
+ client.setDetached(false);
+ client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
int sumExp = 0;
int sumAct = 0;
@@ -609,19 +525,7 @@ public class RescalingITCase extends TestLogger {
}
assertEquals(sumExp, sumAct);
- jobID = null;
-
} finally {
- // clear any left overs from a possibly failed job
- if (jobID != null && jobManager != null) {
- Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
- try {
- Await.ready(jobRemovedFuture, timeout);
- } catch (TimeoutException | InterruptedException ie) {
- fail("Failed while cleaning up the cluster.");
- }
- }
}
}
@@ -1028,4 +932,12 @@ public class RescalingITCase extends TestLogger {
}
}
}
+
+ private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
}
[3/7] flink git commit: [FLINK-8959][tests] Port
AccumulatorLiveITCase to flip6
Posted by ch...@apache.org.
[FLINK-8959][tests] Port AccumulatorLiveITCase to flip6
This closes #5719.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f5488c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f5488c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f5488c5
Branch: refs/heads/master
Commit: 4f5488c592fe153897042d24f9bd03b50767ba9a
Parents: edb6f7f
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 13:59:22 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100
----------------------------------------------------------------------
.../accumulators/AccumulatorLiveITCase.java | 336 +++++-----------
.../LegacyAccumulatorLiveITCase.java | 386 +++++++++++++++++++
2 files changed, 482 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4f5488c5/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 756b81e..ff362dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -18,292 +18,199 @@
package org.apache.flink.test.accumulators;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.fail;
-
/**
- * Tests the availability of accumulator results during runtime. The test case tests a user-defined
- * accumulator and Flink's internal accumulators for two consecutive tasks.
- *
- * <p>CHAINED[Source -> Map] -> Sink
- *
- * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
- * the task to the task manager which notifies the job manager and sends the current accumulators.
- * The task blocks until the test has been notified about the current accumulator values.
- *
- * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
- * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
- * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
- * check for an upper bound of the elements read.
+ * Tests the availability of accumulator results during runtime.
*/
+@Category(Flip6.class)
public class AccumulatorLiveITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
- private static ActorSystem system;
- private static ActorGateway jobManagerGateway;
- private static ActorRef taskManager;
-
- private static JobID jobID;
- private static JobGraph jobGraph;
-
// name of user accumulator
private static final String ACCUMULATOR_NAME = "test";
+ private static final long HEARTBEAT_INTERVAL = 50L;
+
// number of heartbeat intervals to check
private static final int NUM_ITERATIONS = 5;
- private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
+ private static final List<Integer> inputData = new ArrayList<>(NUM_ITERATIONS);
- private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+ static {
+ // generate test data
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ inputData.add(i);
+ }
+ }
- @Before
- public void before() throws Exception {
- system = AkkaUtils.createLocalActorSystem(new Configuration());
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 1,
+ 1),
+ true);
+ private static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
- TestingCluster testingCluster = new TestingCluster(config, false, true);
- testingCluster.start();
-
- jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
- taskManager = testingCluster.getTaskManagersAsJava().get(0);
-
- // generate test data
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- inputData.add(i, String.valueOf(i + 1));
- }
+ config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
- NotifyingMapper.finished = false;
+ return config;
}
- @After
- public void after() throws Exception {
- JavaTestKit.shutdownActorSystem(system);
- inputData.clear();
+ @Before
+ public void resetLatches() throws InterruptedException {
+ NotifyingMapper.reset();
}
@Test
public void testBatch() throws Exception {
-
- /** The program **/
- ExecutionEnvironment env = new BatchPlanExtractor();
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
- DataSet<String> input = env.fromCollection(inputData);
+ DataSet<Integer> input = env.fromCollection(inputData);
input
.flatMap(new NotifyingMapper())
- .output(new NotifyingOutputFormat());
-
- env.execute();
+ .output(new DummyOutputFormat());
// Extract job graph and set job id for the task to notify of accumulator changes.
- jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
- jobID = jobGraph.getJobID();
+ JobGraph jobGraph = getJobGraph(env.createProgramPlan());
- verifyResults();
+ submitJobAndVerifyResults(jobGraph);
}
@Test
public void testStreaming() throws Exception {
- StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
- DataStream<String> input = env.fromCollection(inputData);
+ DataStream<Integer> input = env.fromCollection(inputData);
input
.flatMap(new NotifyingMapper())
- .writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
+ .writeUsingOutputFormat(new DummyOutputFormat()).disableChaining();
- jobGraph = env.getStreamGraph().getJobGraph();
- jobID = jobGraph.getJobID();
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- verifyResults();
+ submitJobAndVerifyResults(jobGraph);
}
- private static void verifyResults() {
- new JavaTestKit(system) {{
-
- ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
-
- // register for accumulator changes
- jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
- expectMsgEquals(TIMEOUT, true);
-
- // submit job
-
- jobManagerGateway.tell(
- new JobManagerMessages.SubmitJob(
- jobGraph,
- ListeningBehaviour.EXECUTION_RESULT),
- selfGateway);
- expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
-
- TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
- Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
-
- ExecutionAttemptID mapperTaskID = null;
-
- ExecutionAttemptID sinkTaskID = null;
-
- /* Check for accumulator values */
- if (checkUserAccumulators(0, userAccumulators)) {
- LOG.info("Passed initial check for map task.");
- } else {
- fail("Wrong accumulator results when map task begins execution.");
- }
-
- int expectedAccVal = 0;
-
- /* for mapper task */
- for (int i = 1; i <= NUM_ITERATIONS; i++) {
- expectedAccVal += i;
-
- // receive message
- msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
- userAccumulators = msg.userAccumulators();
-
- LOG.info("{}", userAccumulators);
-
- if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
- LOG.info("Passed round #" + i);
- } else if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
- // we determined the wrong task id and need to switch the two here
- ExecutionAttemptID temp = mapperTaskID;
- mapperTaskID = sinkTaskID;
- sinkTaskID = temp;
- LOG.info("Passed round #" + i);
- } else {
- fail("Failed in round #" + i);
- }
- }
-
- msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
- userAccumulators = msg.userAccumulators();
-
- if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
- LOG.info("Passed initial check for sink task.");
- } else {
- fail("Wrong accumulator results when sink task begins execution.");
- }
-
- /* for sink task */
- for (int i = 1; i <= NUM_ITERATIONS; i++) {
-
- // receive message
- msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-
- userAccumulators = msg.userAccumulators();
-
- LOG.info("{}", userAccumulators);
-
- if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
- LOG.info("Passed round #" + i);
- } else {
- fail("Failed in round #" + i);
- }
- }
-
- expectMsgClass(TIMEOUT, JobManagerMessages.JobResultSuccess.class);
-
- }};
- }
-
- private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
- LOG.info("checking user accumulators");
- return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
+ private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
+ Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30));
+
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+
+ client.setDetached(true);
+ client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+
+ try {
+ NotifyingMapper.notifyLatch.await();
+
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> {
+ try {
+ return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID()));
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ },
+ Time.milliseconds(20),
+ deadline,
+ accumulators -> accumulators.size() == 1
+ && accumulators.containsKey(ACCUMULATOR_NAME)
+ && (int) accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS,
+ TestingUtils.defaultScheduledExecutor()
+ ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ NotifyingMapper.shutdownLatch.trigger();
+ } finally {
+ NotifyingMapper.shutdownLatch.trigger();
+ }
}
/**
* UDF that notifies when it changes the accumulator values.
*/
- private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
+ private static class NotifyingMapper extends RichFlatMapFunction<Integer, Integer> {
private static final long serialVersionUID = 1L;
- private IntCounter counter = new IntCounter();
+ private static final OneShotLatch notifyLatch = new OneShotLatch();
+ private static final OneShotLatch shutdownLatch = new OneShotLatch();
- private static boolean finished = false;
+ private final IntCounter counter = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
- notifyTaskManagerOfAccumulatorUpdate();
}
@Override
- public void flatMap(String value, Collector<Integer> out) throws Exception {
- int val = Integer.valueOf(value);
- counter.add(val);
- out.collect(val);
+ public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+ counter.add(1);
+ out.collect(value);
LOG.debug("Emitting value {}.", value);
- notifyTaskManagerOfAccumulatorUpdate();
+ if (counter.getLocalValuePrimitive() == 5) {
+ notifyLatch.trigger();
+ }
}
@Override
public void close() throws Exception {
- finished = true;
+ shutdownLatch.await();
+ }
+
+ private static void reset() throws InterruptedException {
+ notifyLatch.reset();
+ shutdownLatch.reset();
}
}
/**
- * Outputs format which notifies of accumulator changes and waits for the previous mapper.
+ * Outputs format which waits for the previous mapper.
*/
- private static class NotifyingOutputFormat implements OutputFormat<Integer> {
+ private static class DummyOutputFormat implements OutputFormat<Integer> {
private static final long serialVersionUID = 1L;
@Override
@@ -312,17 +219,10 @@ public class AccumulatorLiveITCase extends TestLogger {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- while (!NotifyingMapper.finished) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- }
- notifyTaskManagerOfAccumulatorUpdate();
}
@Override
public void writeRecord(Integer record) throws IOException {
- notifyTaskManagerOfAccumulatorUpdate();
}
@Override
@@ -331,56 +231,12 @@ public class AccumulatorLiveITCase extends TestLogger {
}
/**
- * Notify task manager of accumulator update and wait until the Heartbeat containing the message
- * has been reported.
- */
- public static void notifyTaskManagerOfAccumulatorUpdate() {
- new JavaTestKit(system) {{
- Timeout timeout = new Timeout(TIMEOUT);
- Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
- try {
- Await.result(ask, timeout.duration());
- } catch (Exception e) {
- fail("Failed to notify task manager of accumulator update.");
- }
- }};
- }
-
- /**
* Helpers to generate the JobGraph.
*/
- private static JobGraph getOptimizedPlan(Plan plan) {
+ private static JobGraph getJobGraph(Plan plan) {
Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
JobGraphGenerator jgg = new JobGraphGenerator();
OptimizedPlan op = pc.compile(plan);
return jgg.compileJobGraph(op);
}
-
- private static class BatchPlanExtractor extends LocalEnvironment {
-
- private Plan plan = null;
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- plan = createProgramPlan();
- return new JobExecutionResult(new JobID(), -1, null);
- }
- }
-
- /**
- * This is used to for creating the example topology. {@link #execute} is never called, we
- * only use this to call {@link #getStreamGraph()}.
- */
- private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
-
- @Override
- public JobExecutionResult execute() throws Exception {
- return execute("default");
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- throw new RuntimeException("This should not be called.");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4f5488c5/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
new file mode 100644
index 0000000..6595b10
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests the availability of accumulator results during runtime. The test case tests a user-defined
+ * accumulator and Flink's internal accumulators for two consecutive tasks.
+ *
+ * <p>CHAINED[Source -> Map] -> Sink
+ *
+ * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
+ * the task to the task manager which notifies the job manager and sends the current accumulators.
+ * The task blocks until the test has been notified about the current accumulator values.
+ *
+ * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
+ * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
+ * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
+ * check for an upper bound of the elements read.
+ */
+public class LegacyAccumulatorLiveITCase extends TestLogger {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyAccumulatorLiveITCase.class);
+
+ private static ActorSystem system;
+ private static ActorGateway jobManagerGateway;
+ private static ActorRef taskManager;
+
+ private static JobID jobID;
+ private static JobGraph jobGraph;
+
+ // name of user accumulator
+ private static final String ACCUMULATOR_NAME = "test";
+
+ // number of heartbeat intervals to check
+ private static final int NUM_ITERATIONS = 5;
+
+ private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
+
+ private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ @Before
+ public void before() throws Exception {
+ system = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ TestingCluster testingCluster = new TestingCluster(config, false, true);
+ testingCluster.start();
+
+ jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+ taskManager = testingCluster.getTaskManagersAsJava().get(0);
+
+ // generate test data
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ inputData.add(i, String.valueOf(i + 1));
+ }
+
+ NotifyingMapper.finished = false;
+ }
+
+ @After
+ public void after() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ inputData.clear();
+ }
+
+ @Test
+ public void testBatch() throws Exception {
+
+ /** The program **/
+ ExecutionEnvironment env = new BatchPlanExtractor();
+ env.setParallelism(1);
+
+ DataSet<String> input = env.fromCollection(inputData);
+ input
+ .flatMap(new NotifyingMapper())
+ .output(new NotifyingOutputFormat());
+
+ env.execute();
+
+ // Extract job graph and set job id for the task to notify of accumulator changes.
+ jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
+ jobID = jobGraph.getJobID();
+
+ verifyResults();
+ }
+
+ @Test
+ public void testStreaming() throws Exception {
+
+ StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<String> input = env.fromCollection(inputData);
+ input
+ .flatMap(new NotifyingMapper())
+ .writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
+
+ jobGraph = env.getStreamGraph().getJobGraph();
+ jobID = jobGraph.getJobID();
+
+ verifyResults();
+ }
+
+ private static void verifyResults() {
+ new JavaTestKit(system) {{
+
+ ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
+
+ // register for accumulator changes
+ jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
+ expectMsgEquals(TIMEOUT, true);
+
+ // submit job
+
+ jobManagerGateway.tell(
+ new JobManagerMessages.SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT),
+ selfGateway);
+ expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
+
+ TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+ Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
+
+ ExecutionAttemptID mapperTaskID = null;
+
+ ExecutionAttemptID sinkTaskID = null;
+
+ /* Check for accumulator values */
+ if (checkUserAccumulators(0, userAccumulators)) {
+ LOG.info("Passed initial check for map task.");
+ } else {
+ fail("Wrong accumulator results when map task begins execution.");
+ }
+
+ int expectedAccVal = 0;
+
+ /* for mapper task */
+ for (int i = 1; i <= NUM_ITERATIONS; i++) {
+ expectedAccVal += i;
+
+ // receive message
+ msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+ userAccumulators = msg.userAccumulators();
+
+ LOG.info("{}", userAccumulators);
+
+ if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+ LOG.info("Passed round #" + i);
+ } else if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+ // we determined the wrong task id and need to switch the two here
+ ExecutionAttemptID temp = mapperTaskID;
+ mapperTaskID = sinkTaskID;
+ sinkTaskID = temp;
+ LOG.info("Passed round #" + i);
+ } else {
+ fail("Failed in round #" + i);
+ }
+ }
+
+ msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+ userAccumulators = msg.userAccumulators();
+
+ if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+ LOG.info("Passed initial check for sink task.");
+ } else {
+ fail("Wrong accumulator results when sink task begins execution.");
+ }
+
+ /* for sink task */
+ for (int i = 1; i <= NUM_ITERATIONS; i++) {
+
+ // receive message
+ msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+
+ userAccumulators = msg.userAccumulators();
+
+ LOG.info("{}", userAccumulators);
+
+ if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+ LOG.info("Passed round #" + i);
+ } else {
+ fail("Failed in round #" + i);
+ }
+ }
+
+ expectMsgClass(TIMEOUT, JobManagerMessages.JobResultSuccess.class);
+
+ }};
+ }
+
+ private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
+ LOG.info("checking user accumulators");
+ return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
+ }
+
+ /**
+ * UDF that notifies when it changes the accumulator values.
+ */
+ private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ private IntCounter counter = new IntCounter();
+
+ private static boolean finished = false;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
+ notifyTaskManagerOfAccumulatorUpdate();
+ }
+
+ @Override
+ public void flatMap(String value, Collector<Integer> out) throws Exception {
+ int val = Integer.valueOf(value);
+ counter.add(val);
+ out.collect(val);
+ LOG.debug("Emitting value {}.", value);
+ notifyTaskManagerOfAccumulatorUpdate();
+ }
+
+ @Override
+ public void close() throws Exception {
+ finished = true;
+ }
+ }
+
+ /**
+ * Outputs format which notifies of accumulator changes and waits for the previous mapper.
+ */
+ private static class NotifyingOutputFormat implements OutputFormat<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ while (!NotifyingMapper.finished) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+ notifyTaskManagerOfAccumulatorUpdate();
+ }
+
+ @Override
+ public void writeRecord(Integer record) throws IOException {
+ notifyTaskManagerOfAccumulatorUpdate();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ /**
+ * Notify task manager of accumulator update and wait until the Heartbeat containing the message
+ * has been reported.
+ */
+ public static void notifyTaskManagerOfAccumulatorUpdate() {
+ new JavaTestKit(system) {{
+ Timeout timeout = new Timeout(TIMEOUT);
+ Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
+ try {
+ Await.result(ask, timeout.duration());
+ } catch (Exception e) {
+ fail("Failed to notify task manager of accumulator update.");
+ }
+ }};
+ }
+
+ /**
+ * Helpers to generate the JobGraph.
+ */
+ private static JobGraph getOptimizedPlan(Plan plan) {
+ Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+ JobGraphGenerator jgg = new JobGraphGenerator();
+ OptimizedPlan op = pc.compile(plan);
+ return jgg.compileJobGraph(op);
+ }
+
+ private static class BatchPlanExtractor extends LocalEnvironment {
+
+ private Plan plan = null;
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ plan = createProgramPlan();
+ return new JobExecutionResult(new JobID(), -1, null);
+ }
+ }
+
+ /**
+ * This is used to for creating the example topology. {@link #execute} is never called, we
+ * only use this to call {@link #getStreamGraph()}.
+ */
+ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+ @Override
+ public JobExecutionResult execute() throws Exception {
+ return execute("default");
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ throw new RuntimeException("This should not be called.");
+ }
+ }
+}
[6/7] flink git commit: [FLINK-8958][tests] Port
TaskCancelAsyncProducerConsumerITCase to flip6
Posted by ch...@apache.org.
[FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6
This closes #5722.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c56e191
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c56e191
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c56e191
Branch: refs/heads/master
Commit: 0c56e1917aa3a563a7425ba98ff33ed9bfcd22c5
Parents: 0623e24
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 15:16:18 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:11:49 2018 +0100
----------------------------------------------------------------------
...cyTaskCancelAsyncProducerConsumerITCase.java | 287 +++++++++++++++++++
.../TaskCancelAsyncProducerConsumerITCase.java | 82 +++---
2 files changed, 329 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c56e191/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
new file mode 100644
index 0000000..ee0bfda
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class LegacyTaskCancelAsyncProducerConsumerITCase extends TestLogger {
+
+ // The Exceptions thrown by the producer/consumer Threads
+ private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
+ private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
+
+ // The Threads producing/consuming the intermediate stream
+ private static volatile Thread ASYNC_PRODUCER_THREAD;
+ private static volatile Thread ASYNC_CONSUMER_THREAD;
+
+ /**
+ * Tests that a task waiting on an async producer/consumer that is stuck
+ * in a blocking buffer request can be properly cancelled.
+ *
+ * <p>This is currently required for the Flink Kafka sources, which spawn
+ * a separate Thread consuming from Kafka and producing the intermediate
+ * streams in the spawned Thread instead of the main task Thread.
+ */
+ @Test
+ public void testCancelAsyncProducerAndConsumer() throws Exception {
+ Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+ TestingCluster flink = null;
+
+ try {
+ // Cluster
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+
+ flink = new TestingCluster(config, true);
+ flink.start();
+
+ // Job with async producer and consumer
+ JobVertex producer = new JobVertex("AsyncProducer");
+ producer.setParallelism(1);
+ producer.setInvokableClass(AsyncProducer.class);
+
+ JobVertex consumer = new JobVertex("AsyncConsumer");
+ consumer.setParallelism(1);
+ consumer.setInvokableClass(AsyncConsumer.class);
+ consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
+ producer.setSlotSharingGroup(slot);
+ consumer.setSlotSharingGroup(slot);
+
+ JobGraph jobGraph = new JobGraph(producer, consumer);
+
+ // Submit job and wait until running
+ ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+ flink.submitJobDetached(jobGraph);
+
+ Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+ Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
+ Await.ready(runningFuture, deadline.timeLeft());
+
+ // Wait for blocking requests, cancel and wait for cancellation
+ msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
+ Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+
+ boolean producerBlocked = false;
+ for (int i = 0; i < 50; i++) {
+ Thread thread = ASYNC_PRODUCER_THREAD;
+
+ if (thread != null && thread.isAlive()) {
+ StackTraceElement[] stackTrace = thread.getStackTrace();
+ producerBlocked = isInBlockingBufferRequest(stackTrace);
+ }
+
+ if (producerBlocked) {
+ break;
+ } else {
+ // Retry
+ Thread.sleep(500L);
+ }
+ }
+
+ // Verify that async producer is in blocking request
+ assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
+
+ boolean consumerWaiting = false;
+ for (int i = 0; i < 50; i++) {
+ Thread thread = ASYNC_CONSUMER_THREAD;
+
+ if (thread != null && thread.isAlive()) {
+ consumerWaiting = thread.getState() == Thread.State.WAITING;
+ }
+
+ if (consumerWaiting) {
+ break;
+ } else {
+ // Retry
+ Thread.sleep(500L);
+ }
+ }
+
+ // Verify that async consumer is in blocking request
+ assertTrue("Consumer thread is not blocked.", consumerWaiting);
+
+ msg = new CancelJob(jobGraph.getJobID());
+ Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
+ Await.ready(cancelFuture, deadline.timeLeft());
+
+ Await.ready(cancelledFuture, deadline.timeLeft());
+
+ // Verify the expected Exceptions
+ assertNotNull(ASYNC_PRODUCER_EXCEPTION);
+ assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
+
+ assertNotNull(ASYNC_CONSUMER_EXCEPTION);
+ assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
+ } finally {
+ if (flink != null) {
+ flink.stop();
+ }
+ }
+ }
+
+ /**
+ * Invokable emitting records in a separate Thread (not the main Task
+ * thread).
+ */
+ public static class AsyncProducer extends AbstractInvokable {
+
+ public AsyncProducer(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ Thread producer = new ProducerThread(getEnvironment().getWriter(0));
+
+ // Publish the async producer for the main test Thread
+ ASYNC_PRODUCER_THREAD = producer;
+
+ producer.start();
+
+ // Wait for the producer Thread to finish. This is executed in the
+ // main Task thread and will be interrupted on cancellation.
+ while (producer.isAlive()) {
+ try {
+ producer.join();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ /**
+ * The Thread emitting the records.
+ */
+ private static class ProducerThread extends Thread {
+
+ private final RecordWriter<LongValue> recordWriter;
+
+ public ProducerThread(ResultPartitionWriter partitionWriter) {
+ this.recordWriter = new RecordWriter<>(partitionWriter);
+ }
+
+ @Override
+ public void run() {
+ LongValue current = new LongValue(0);
+
+ try {
+ while (true) {
+ current.setValue(current.getValue() + 1);
+ recordWriter.emit(current);
+ recordWriter.flushAll();
+ }
+ } catch (Exception e) {
+ ASYNC_PRODUCER_EXCEPTION = e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Invokable consuming buffers in a separate Thread (not the main Task
+ * thread).
+ */
+ public static class AsyncConsumer extends AbstractInvokable {
+
+ public AsyncConsumer(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ Thread consumer = new ConsumerThread(getEnvironment().getInputGate(0));
+
+ // Publish the async consumer for the main test Thread
+ ASYNC_CONSUMER_THREAD = consumer;
+
+ consumer.start();
+
+ // Wait for the consumer Thread to finish. This is executed in the
+ // main Task thread and will be interrupted on cancellation.
+ while (consumer.isAlive()) {
+ try {
+ consumer.join();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ /**
+ * The Thread consuming buffers.
+ */
+ private static class ConsumerThread extends Thread {
+
+ private final InputGate inputGate;
+
+ public ConsumerThread(InputGate inputGate) {
+ this.inputGate = inputGate;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ inputGate.getNextBufferOrEvent();
+ }
+ } catch (Exception e) {
+ ASYNC_CONSUMER_EXCEPTION = e;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c56e191/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index c63af83..4b73b09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -18,11 +18,12 @@
package org.apache.flink.runtime.taskmanager;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -33,28 +34,26 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+@Category(Flip6.class)
public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
// The Exceptions thrown by the producer/consumer Threads
@@ -75,18 +74,20 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
*/
@Test
public void testCancelAsyncProducerAndConsumer() throws Exception {
- Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
- TestingCluster flink = null;
-
- try {
- // Cluster
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
- config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
- config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
-
- flink = new TestingCluster(config, true);
+ Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
+
+ // Cluster
+ Configuration config = new Configuration();
+ config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+
+ MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(1)
+ .build();
+
+ try (MiniCluster flink = new MiniCluster(miniClusterConfiguration)) {
flink.start();
// Job with async producer and consumer
@@ -106,16 +107,15 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
JobGraph jobGraph = new JobGraph(producer, consumer);
// Submit job and wait until running
- ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
- flink.submitJobDetached(jobGraph);
-
- Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
- Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
- Await.ready(runningFuture, deadline.timeLeft());
+ flink.runDetached(jobGraph);
- // Wait for blocking requests, cancel and wait for cancellation
- msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
- Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> flink.getJobStatus(jobGraph.getJobID()),
+ Time.milliseconds(10),
+ deadline,
+ status -> status == JobStatus.RUNNING,
+ TestingUtils.defaultScheduledExecutor()
+ ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
boolean producerBlocked = false;
for (int i = 0; i < 50; i++) {
@@ -156,11 +156,17 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
// Verify that async consumer is in blocking request
assertTrue("Consumer thread is not blocked.", consumerWaiting);
- msg = new CancelJob(jobGraph.getJobID());
- Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
- Await.ready(cancelFuture, deadline.timeLeft());
+ flink.cancelJob(jobGraph.getJobID())
+ .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- Await.ready(cancelledFuture, deadline.timeLeft());
+ // wait until the job is canceled
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> flink.getJobStatus(jobGraph.getJobID()),
+ Time.milliseconds(10),
+ deadline,
+ status -> status == JobStatus.CANCELED,
+ TestingUtils.defaultScheduledExecutor()
+ ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
// Verify the expected Exceptions
assertNotNull(ASYNC_PRODUCER_EXCEPTION);
@@ -168,10 +174,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
assertNotNull(ASYNC_CONSUMER_EXCEPTION);
assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
- } finally {
- if (flink != null) {
- flink.stop();
- }
}
}