You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/23 16:44:28 UTC
[flink] 01/02: [FLINK-10369][tests] Enable YARNITCase to test per
job mode deployment
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6a14c02d898b881903e62ed847bf595beaea7cd
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 19 15:09:54 2018 +0200
[FLINK-10369][tests] Enable YARNITCase to test per job mode deployment
This closes #6717.
---
.../java/org/apache/flink/yarn/YARNITCase.java | 68 +++++++++++++---------
1 file changed, 39 insertions(+), 29 deletions(-)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 758a098..56cfdb6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -20,10 +20,12 @@ package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -32,12 +34,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.util.Arrays;
-import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
/**
* Test cases for the deployment of Yarn Flink clusters.
@@ -50,7 +56,6 @@ public class YARNITCase extends YarnTestBase {
startYARNWithConfig(YARN_CONFIGURATION);
}
- @Ignore("The cluster cannot be stopped yet.")
@Test
public void testPerJobMode() throws Exception {
Configuration configuration = new Configuration();
@@ -77,9 +82,9 @@ public class YARNITCase extends YarnTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
- env.addSource(new InfiniteSource())
+ env.addSource(new NoDataSource())
.shuffle()
- .addSink(new DiscardingSink<Integer>());
+ .addSink(new DiscardingSink<>());
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -87,40 +92,45 @@ public class YARNITCase extends YarnTestBase {
jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
- ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
- clusterSpecification,
- jobGraph,
- true);
+ ApplicationId applicationId = null;
+ ClusterClient<ApplicationId> clusterClient = null;
- clusterClient.shutdown();
- }
- }
+ try {
+ clusterClient = yarnClusterDescriptor.deployJobCluster(
+ clusterSpecification,
+ jobGraph,
+ false);
+ applicationId = clusterClient.getClusterId();
- private static class InfiniteSource implements ParallelSourceFunction<Integer> {
+ assertThat(clusterClient, is(instanceOf(RestClusterClient.class)));
+ final RestClusterClient<ApplicationId> restClusterClient = (RestClusterClient<ApplicationId>) clusterClient;
- private static final long serialVersionUID = 1642561062000662861L;
- private volatile boolean running;
- private final Random random;
+ final CompletableFuture<JobResult> jobResultCompletableFuture = restClusterClient.requestJobResult(jobGraph.getJobID());
- InfiniteSource() {
- running = true;
- random = new Random();
- }
+ final JobResult jobResult = jobResultCompletableFuture.get();
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- while (running) {
- synchronized (ctx.getCheckpointLock()) {
- ctx.collect(random.nextInt());
+ assertThat(jobResult, is(notNullValue()));
+ assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+ } finally {
+ if (clusterClient != null) {
+ clusterClient.shutdown();
}
- Thread.sleep(5L);
+ if (applicationId != null) {
+ yarnClusterDescriptor.killCluster(applicationId);
+ }
}
}
+ }
+
+ private static class NoDataSource implements ParallelSourceFunction<Integer> {
+
+ private static final long serialVersionUID = 1642561062000662861L;
@Override
- public void cancel() {
- running = false;
- }
+ public void run(SourceContext<Integer> ctx) {}
+
+ @Override
+ public void cancel() {}
}
}