You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/25 21:41:41 UTC
[flink] 01/02: [FLINK-11390][tests] Port testTaskManagerFailure to
new codebase.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c95e9f642288bb2816cf84868709ea2543a90ae5
Author: Gary Yao <ga...@data-artisans.com>
AuthorDate: Fri Jan 18 22:32:22 2019 +0100
[FLINK-11390][tests] Port testTaskManagerFailure to new codebase.
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to flip6 codebase:
* Remove assertions that rely on log messages only
* Move part where TMs are killed to YARNHighAvailabilityITCase
* Rename test to a proper name that describes what it does
* Add Javadoc explaning what this test does
[FLINK-11390][tests] Move comment to right position
[FLINK-11390][tests] Reuse YarnClient from super class
Move waitUntilCondition to YarnTestBase
[FLINK-11390][tests] Extract method parse hostname
Extract method getOnlyApplicationReport
Extract method submitJob
Extract method getNumberOfSlotsPerTaskManager
Extract method getFlinkConfigFromRestApi
Delete useless comment
Rename: runner -> yarnSessionClusterRunner
Delete useless sleep & refactor
Reorder methods and add static keyword where possible
This closes #7546.
---
.../client/program/rest/RestClusterClient.java | 21 +-
.../org/apache/flink/runtime/rest/RestClient.java | 9 +
.../job/metrics/JobMetricsMessageParameters.java | 2 +-
.../flink/yarn/YARNHighAvailabilityITCase.java | 232 +++++++++++---
.../yarn/YARNSessionCapacitySchedulerITCase.java | 345 +++++++++++----------
.../java/org/apache/flink/yarn/YarnTestBase.java | 20 +-
.../org/apache/flink/yarn/testjob/YarnTestJob.java | 56 +++-
7 files changed, 467 insertions(+), 218 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index c6dc37e..eea83c6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -272,17 +272,25 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
}
}
- @Override
- public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
- JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
+ /**
+ * Requests the job details.
+ *
+ * @param jobId The job id
+ * @return Job details
+ */
+ public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
+ final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
final JobMessageParameters params = new JobMessageParameters();
params.jobPathParameter.resolve(jobId);
- CompletableFuture<JobDetailsInfo> responseFuture = sendRequest(
+ return sendRequest(
detailsHeaders,
params);
+ }
- return responseFuture.thenApply(JobDetailsInfo::getJobStatus);
+ @Override
+ public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+ return getJobDetails(jobId).thenApply(JobDetailsInfo::getJobStatus);
}
/**
@@ -694,7 +702,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
}
- private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+ @VisibleForTesting
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
sendRequest(M messageHeaders, U messageParameters, R request) {
return sendRetriableRequest(
messageHeaders, messageParameters, request, isConnectionProblemOrServiceUnavailable());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a25b13c..c478cf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -185,6 +187,13 @@ public class RestClient implements AutoCloseableAsync {
return terminationFuture;
}
+ public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders) throws IOException {
+ return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+ }
+
public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
String targetAddress,
int targetPort,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
index f8bab83..3f6f8af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsMessageParameters.java
@@ -31,7 +31,7 @@ import java.util.Collections;
*/
public class JobMetricsMessageParameters extends JobMessageParameters {
- private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+ public final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 1a2eb92..de7e02a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -25,28 +25,43 @@ import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
import org.apache.flink.yarn.testjob.YarnTestJob;
import org.apache.flink.yarn.util.YarnTestUtils;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.AfterClass;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -58,13 +73,21 @@ import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;
@@ -78,11 +101,13 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
private static final String LOG_DIR = "flink-yarn-tests-ha";
private static final Duration TIMEOUT = Duration.ofSeconds(200L);
- private static final long RETRY_TIMEOUT = 100L;
private static TestingServer zkServer;
private static String storageDir;
+ private YarnTestJob.StopJobSignal stopJobSignal;
+ private JobGraph job;
+
@BeforeClass
public static void setup() throws Exception {
zkServer = new TestingServer();
@@ -104,6 +129,22 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
}
}
+ @Before
+ public void setUp() throws Exception {
+ initJobGraph();
+ }
+
+ private void initJobGraph() throws IOException {
+ stopJobSignal = YarnTestJob.StopJobSignal.usingMarkerFile(FOLDER.newFile().toPath());
+ job = YarnTestJob.stoppableJob(stopJobSignal);
+ final File testingJar =
+ YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+
+ assertThat(testingJar, notNullValue());
+
+ job.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+ }
+
/**
* Tests that Yarn will restart a killed {@link YarnSessionClusterEntrypoint} which will then resume
* a persisted {@link JobGraph}.
@@ -115,33 +156,107 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
-
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
- final JobGraph job = createJobGraph();
+ try {
+ final JobID jobId = submitJob(restClusterClient);
+ final ApplicationId id = restClusterClient.getClusterId();
+
+ waitUntilJobIsRunning(restClusterClient, jobId);
- final JobID jobId = submitJob(restClusterClient, job);
+ killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+ waitForApplicationAttempt(id, 2);
- final ApplicationId id = restClusterClient.getClusterId();
+ waitForJobTermination(restClusterClient, jobId);
- waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+ killApplicationAndWait(id);
+ } finally {
+ restClusterClient.shutdown();
+ }
+ }
+
+ @Test
+ public void testJobRecoversAfterKillingTaskManager() throws Exception {
+ final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
+ final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
+ try {
+ final JobID jobId = submitJob(restClusterClient);
+ waitUntilJobIsRunning(restClusterClient, jobId);
+
+ stopTaskManagerContainer();
+ waitUntilJobIsRestarted(restClusterClient, jobId, 1);
- killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+ waitForJobTermination(restClusterClient, jobId);
+ killApplicationAndWait(restClusterClient.getClusterId());
+ } finally {
+ restClusterClient.shutdown();
+ }
+ }
+
+ private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId) throws Exception {
final YarnClient yarnClient = getYarnClient();
- Assert.assertNotNull(yarnClient);
+ checkState(yarnClient != null, "yarnClient must be initialized");
- while (yarnClient.getApplicationReport(id).getCurrentApplicationAttemptId().getAttemptId() < 2) {
- Thread.sleep(RETRY_TIMEOUT);
+ waitUntilCondition(() -> {
+ final ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId);
+ return applicationReport.getCurrentApplicationAttemptId().getAttemptId() >= attemptId;
+ }, Deadline.fromNow(TIMEOUT));
+ }
+
+ /**
+ * Stops a container running {@link YarnTaskExecutorRunner}.
+ */
+ private void stopTaskManagerContainer() throws Exception {
+ // find container id of taskManager:
+ ContainerId taskManagerContainer = null;
+ NodeManager nodeManager = null;
+ NMTokenIdentifier nmIdent = null;
+ UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+
+ for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+ NodeManager nm = yarnCluster.getNodeManager(nmId);
+ ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
+ for (Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
+ String command = StringUtils.join(entry.getValue().getLaunchContext().getCommands(), " ");
+ if (command.contains(YarnTaskExecutorRunner.class.getSimpleName())) {
+ taskManagerContainer = entry.getKey();
+ nodeManager = nm;
+ nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0);
+ // allow myself to do stuff with the container
+ // remoteUgi.addCredentials(entry.getValue().getCredentials());
+ remoteUgi.addTokenIdentifier(nmIdent);
+ }
+ }
}
- waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+ assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
+ assertNotNull("Illegal state", nodeManager);
+
+ StopContainersRequest scr = StopContainersRequest.newInstance(Collections.singletonList(taskManagerContainer));
+
+ nodeManager.getNMContext().getContainerManager().stopContainers(scr);
+
+ // cleanup auth for the subsequent tests.
+ remoteUgi.getTokenIdentifiers().remove(nmIdent);
+ }
+
+ private void killApplicationAndWait(final ApplicationId id) throws Exception {
+ final YarnClient yarnClient = getYarnClient();
+ checkState(yarnClient != null, "yarnClient must be initialized");
yarnClient.killApplication(id);
- while (yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FINISHED)).isEmpty()) {
- Thread.sleep(RETRY_TIMEOUT);
- }
+ waitUntilCondition(() -> !yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FINISHED)).isEmpty(),
+ Deadline.fromNow(TIMEOUT));
+ }
+
+ private void waitForJobTermination(
+ final RestClusterClient<ApplicationId> restClusterClient,
+ final JobID jobId) throws Exception {
+ stopJobSignal.signal();
+ final CompletableFuture<JobResult> jobResult = restClusterClient.requestJobResult(jobId);
+ jobResult.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
}
@Nonnull
@@ -153,6 +268,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
+ flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
+ flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+
final int minMemory = 100;
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
@@ -172,8 +290,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
return (RestClusterClient<ApplicationId>) yarnClusterClient;
}
- private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient, JobGraph job) throws InterruptedException, java.util.concurrent.ExecutionException {
- final CompletableFuture<JobSubmissionResult> jobSubmissionResultCompletableFuture = restClusterClient.submitJob(job);
+ private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient) throws InterruptedException, java.util.concurrent.ExecutionException {
+ final CompletableFuture<JobSubmissionResult> jobSubmissionResultCompletableFuture =
+ restClusterClient.submitJob(job);
final JobSubmissionResult jobSubmissionResult = jobSubmissionResultCompletableFuture.get();
return jobSubmissionResult.getJobID();
@@ -184,38 +303,61 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
assertThat(exec.waitFor(), is(0));
}
- @Nonnull
- private JobGraph createJobGraph() {
- final JobGraph job = YarnTestJob.createJob();
- final File testingJar =
- YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+ private static void waitUntilJobIsRunning(RestClusterClient<ApplicationId> restClusterClient, JobID jobId) throws Exception {
+ waitUntilCondition(
+ () -> {
+ final JobDetailsInfo jobDetails = restClusterClient.getJobDetails(jobId).get();
+ return jobDetails.getJobVertexInfos()
+ .stream()
+ .map(toExecutionState())
+ .allMatch(isRunning());
+ },
+ Deadline.fromNow(TIMEOUT));
+ }
- assertThat(testingJar, notNullValue());
+ private static Function<JobDetailsInfo.JobVertexDetailsInfo, ExecutionState> toExecutionState() {
+ return JobDetailsInfo.JobVertexDetailsInfo::getExecutionState;
+ }
- job.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
- return job;
+ private static Predicate<ExecutionState> isRunning() {
+ return executionState -> executionState == ExecutionState.RUNNING;
}
- private void waitUntilJobIsRunning(RestClusterClient<ApplicationId> restClusterClient, JobID jobId, long retryTimeout) throws Exception {
+ private static void waitUntilJobIsRestarted(
+ final RestClusterClient<ApplicationId> restClusterClient,
+ final JobID jobId,
+ final int expectedFullRestarts) throws Exception {
waitUntilCondition(
- () -> {
- final Collection<JobStatusMessage> jobStatusMessages = restClusterClient.listJobs().get();
+ () -> getJobFullRestarts(restClusterClient, jobId) >= expectedFullRestarts,
+ Deadline.fromNow(TIMEOUT));
+ }
- return jobStatusMessages.stream()
- .filter(jobStatusMessage -> jobStatusMessage.getJobId().equals(jobId))
- .anyMatch(jobStatusMessage -> jobStatusMessage.getJobState() == JobStatus.RUNNING);
- },
- Deadline.fromNow(TIMEOUT),
- retryTimeout);
+ private static int getJobFullRestarts(
+ final RestClusterClient<ApplicationId> restClusterClient,
+ final JobID jobId) throws Exception {
+
+ return getJobMetric(restClusterClient, jobId, "fullRestarts")
+ .map(Metric::getValue)
+ .map(Integer::parseInt)
+ .orElse(0);
}
- private void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryTimeout) throws Exception {
- while (timeout.hasTimeLeft() && !condition.get()) {
- Thread.sleep(Math.min(retryTimeout, timeout.timeLeft().toMillis()));
- }
+ private static Optional<Metric> getJobMetric(
+ final RestClusterClient<ApplicationId> restClusterClient,
+ final JobID jobId,
+ final String metricName) throws Exception {
- if (!timeout.hasTimeLeft()) {
- throw new TimeoutException("Condition was not met in given timeout.");
- }
+ final JobMetricsMessageParameters messageParameters = new JobMetricsMessageParameters();
+ messageParameters.jobPathParameter.resolve(jobId);
+ messageParameters.metricsFilterParameter.resolveFromString(metricName);
+
+ final Collection<Metric> metrics = restClusterClient.sendRequest(
+ JobMetricsHeaders.getInstance(),
+ messageParameters,
+ EmptyRequestBody.getInstance()).get().getMetrics();
+
+ final Metric metric = Iterables.getOnlyElement(metrics, null);
+ checkState(metric == null || metric.getId().equals(metricName));
+ return Optional.ofNullable(metric);
}
}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 83abd8a..93b4c9d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,40 +18,43 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.guava18.com.google.common.net.HostAndPort;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.log4j.Level;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -63,23 +66,31 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
-import static org.junit.Assume.assumeTrue;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
* This test starts a MiniYARNCluster with a CapacityScheduler.
@@ -88,14 +99,46 @@ import static org.junit.Assume.assumeTrue;
public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
+ /**
+ * RestClient to query Flink cluster.
+ */
+ private static RestClient restClient;
+
+ /**
+ * ExecutorService for {@link RestClient}.
+ * @see #restClient
+ */
+ private static ExecutorService restClientExecutor;
+
+ /** Toggles checking for prohibited strings in logs after the test has run. */
+ private boolean checkForProhibitedLogContents = true;
+
@BeforeClass
- public static void setup() {
+ public static void setup() throws Exception {
YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
YARN_CONFIGURATION.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
startYARNWithConfig(YARN_CONFIGURATION);
+
+ restClientExecutor = Executors.newSingleThreadExecutor();
+ restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), restClientExecutor);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ try {
+ YarnTestBase.teardown();
+ } finally {
+ if (restClient != null) {
+ restClient.shutdown(Time.seconds(5));
+ }
+
+ if (restClientExecutor != null) {
+ restClientExecutor.shutdownNow();
+ }
+ }
}
/**
@@ -184,14 +227,23 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
}
/**
- * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
+ * Starts a session cluster on YARN, and submits a streaming job.
+ *
+ * <p>Tests
+ * <ul>
+ * <li>if a custom YARN application name can be set from the command line,
+ * <li>if the number of TaskManager slots can be set from the command line,
+ * <li>if dynamic properties from the command line are set,
+ * <li>if the vcores are set correctly (FLINK-2213),
+ * <li>if jobmanager hostname/port are shown in web interface (FLINK-1902)
+ * </ul>
+ *
+ * <p><b>Hint: </b> If you think it is a good idea to add more assertions to this test, think again!
*/
- @Test(timeout = 100000) // timeout after 100 seconds
- public void testTaskManagerFailure() throws Exception {
- assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
- LOG.info("Starting testTaskManagerFailure()");
- Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
- "-n", "1",
+ @Test(timeout = 100_000)
+ public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception {
+ checkForProhibitedLogContents = false;
+ final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the vCores are set properly!
@@ -199,163 +251,134 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
- "Number of connected TaskManagers changed to 1. Slots available: 3",
+ "Flink JobManager is now running on ",
RunTypes.YARN_SESSION);
- Assert.assertEquals(2, getRunningContainers());
-
- // ------------------------ Test if JobManager web interface is accessible -------
-
- final YarnClient yc = YarnClient.createYarnClient();
- yc.init(YARN_CONFIGURATION);
- yc.start();
-
- List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
- Assert.assertEquals(1, apps.size()); // Only one running
- ApplicationReport app = apps.get(0);
- Assert.assertEquals("customName", app.getName());
- String url = app.getTrackingUrl();
- if (!url.endsWith("/")) {
- url += "/";
- }
- if (!url.startsWith("http://")) {
- url = "http://" + url;
- }
- LOG.info("Got application URL from YARN {}", url);
-
- String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
-
- JsonNode parsedTMs = new ObjectMapper().readTree(response);
- ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
- Assert.assertNotNull(taskManagers);
- Assert.assertEquals(1, taskManagers.size());
- Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt());
-
- // get the configuration from webinterface & check if the dynamic properties from YARN show up there.
- String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
- Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
-
- Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
- Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
- Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key()));
+ final String logs = outContent.toString();
+ final HostAndPort hostAndPort = parseJobManagerHostname(logs);
+ final String host = hostAndPort.getHostText();
+ final int port = hostAndPort.getPort();
+ LOG.info("Extracted hostname:port: {}", host, port);
+
+ submitJob("WindowJoin.jar");
+
+ //
+ // Assert that custom YARN application name "customName" is set
+ //
+ final ApplicationReport applicationReport = getOnlyApplicationReport();
+ assertEquals("customName", applicationReport.getName());
+
+ //
+ // Assert the number of TaskManager slots are set
+ //
+ waitForTaskManagerRegistration(host, port, Duration.ofMillis(30_000));
+ assertNumberOfSlotsPerTask(host, port, 3);
+
+ final Map<String, String> flinkConfig = getFlinkConfig(host, port);
+
+ //
+ // Assert dynamic properties
+ //
+ assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy"));
+ assertThat(flinkConfig, hasEntry("yarn.maximum-failed-containers", "3"));
+
+ //
+ // FLINK-2213: assert that vcores are set
+ //
+ assertThat(flinkConfig, hasEntry(YarnConfigOptions.VCORES.key(), "2"));
+
+ //
+ // FLINK-1902: check if jobmanager hostname is shown in web interface
+ //
+ assertThat(flinkConfig, hasEntry(JobManagerOptions.ADDRESS.key(), host));
+
+ yarnSessionClusterRunner.sendStop();
+ yarnSessionClusterRunner.join();
+ }
- // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
- // first, get the hostname/port
- String oC = outContent.toString();
- Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
- Matcher matches = p.matcher(oC);
+ private static HostAndPort parseJobManagerHostname(final String logs) {
+ final Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
+ final Matcher matches = p.matcher(logs);
String hostname = null;
String port = null;
+
while (matches.find()) {
hostname = matches.group(1).toLowerCase();
port = matches.group(2);
}
- LOG.info("Extracted hostname:port: {} {}", hostname, port);
-
- Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
- parsedConfig.get(JobManagerOptions.ADDRESS.key()));
- Assert.assertEquals("unable to find port in " + jsonConfig, port,
- parsedConfig.get(JobManagerOptions.PORT.key()));
-
- // test logfile access
- String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
- Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
- Assert.assertTrue(logs.contains("Starting JobManager"));
- Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
-
- // ------------------------ Kill container with TaskManager and check if vcores are set correctly -------
-
- // find container id of taskManager:
- ContainerId taskManagerContainer = null;
- NodeManager nodeManager = null;
- UserGroupInformation remoteUgi = null;
- NMTokenIdentifier nmIdent = null;
- try {
- remoteUgi = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- LOG.warn("Unable to get curr user", e);
- Assert.fail();
- }
- for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
- NodeManager nm = yarnCluster.getNodeManager(nmId);
- ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
- for (Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
- String command = StringUtils.join(entry.getValue().getLaunchContext().getCommands(), " ");
- if (command.contains(YarnTaskManager.class.getSimpleName())) {
- taskManagerContainer = entry.getKey();
- nodeManager = nm;
- nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0);
- // allow myself to do stuff with the container
- // remoteUgi.addCredentials(entry.getValue().getCredentials());
- remoteUgi.addTokenIdentifier(nmIdent);
- }
- }
- sleep(500);
- }
- Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
- Assert.assertNotNull("Illegal state", nodeManager);
+ checkState(hostname != null, "hostname not found in log");
+ checkState(port != null, "port not found in log");
- yc.stop();
+ return HostAndPort.fromParts(hostname, Integer.parseInt(port));
+ }
- List<ContainerId> toStop = new LinkedList<ContainerId>();
- toStop.add(taskManagerContainer);
- StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
+ private ApplicationReport getOnlyApplicationReport() throws IOException, YarnException {
+ final YarnClient yarnClient = getYarnClient();
+ checkState(yarnClient != null);
- try {
- nodeManager.getNMContext().getContainerManager().stopContainers(scr);
- } catch (Throwable e) {
- LOG.warn("Error stopping container", e);
- Assert.fail("Error stopping container: " + e.getMessage());
- }
+ final List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+ assertEquals(1, apps.size()); // Only one running
+ return apps.get(0);
+ }
- // stateful termination check:
- // wait until we saw a container being killed and AFTERWARDS a new one launched
- boolean ok = false;
- do {
- LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
-
- String o = errContent.toString();
- int killedOff = o.indexOf("Container killed by the ApplicationMaster");
- if (killedOff != -1) {
- o = o.substring(killedOff);
- ok = o.indexOf("Launching TaskManager") > 0;
- }
- sleep(1000);
- } while(!ok);
+ private void submitJob(final String jobFileName) throws IOException, InterruptedException {
+ Runner jobRunner = startWithArgs(new String[]{"run",
+ "--detached", getTestJarPath(jobFileName).getAbsolutePath()},
+ "Job has been submitted with JobID", RunTypes.CLI_FRONTEND);
+ jobRunner.join();
+ }
+
+ private static void waitForTaskManagerRegistration(
+ final String host,
+ final int port,
+ final Duration waitDuration) throws Exception {
+ waitUntilCondition(() -> getNumberOfTaskManagers(host, port) > 0, Deadline.fromNow(waitDuration));
+ }
- // send "stop" command to command line interface
- runner.sendStop();
- // wait for the thread to stop
+ private static void assertNumberOfSlotsPerTask(
+ final String host,
+ final int port,
+ final int slotsNumber) throws Exception {
try {
- runner.join();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while stopping runner", e);
+ waitUntilCondition(() -> getNumberOfSlotsPerTaskManager(host, port) == slotsNumber, Deadline.fromNow(Duration.ofSeconds(30)));
+ } catch (final TimeoutException e) {
+ final int currentNumberOfSlots = getNumberOfSlotsPerTaskManager(host, port);
+ fail(String.format("Expected slots per TM to be %d, was: %d", slotsNumber, currentNumberOfSlots));
}
- LOG.warn("stopped");
-
- // ----------- Send output to logger
- System.setOut(ORIGINAL_STDOUT);
- System.setErr(ORIGINAL_STDERR);
- oC = outContent.toString();
- String eC = errContent.toString();
- LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
- LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
+ }
- // ------ Check if everything happened correctly
- Assert.assertTrue("Expect to see failed container",
- eC.contains("New messages from the YARN cluster"));
+ private static int getNumberOfTaskManagers(final String host, final int port) throws Exception {
+ final ClusterOverviewWithVersion clusterOverviewWithVersion = restClient.sendRequest(
+ host,
+ port,
+ ClusterOverviewHeaders.getInstance()).get(30_000, TimeUnit.MILLISECONDS);
- Assert.assertTrue("Expect to see failed container",
- eC.contains("Container killed by the ApplicationMaster"));
+ return clusterOverviewWithVersion.getNumTaskManagersConnected();
+ }
- Assert.assertTrue("Expect to see new container started",
- eC.contains("Launching TaskManager") && eC.contains("on host"));
+ private static int getNumberOfSlotsPerTaskManager(final String host, final int port) throws Exception {
+ final TaskManagersInfo taskManagersInfo = restClient.sendRequest(
+ host,
+ port,
+ TaskManagersHeaders.getInstance()).get();
+
+ return taskManagersInfo.getTaskManagerInfos()
+ .stream()
+ .map(TaskManagerInfo::getNumberSlots)
+ .findFirst()
+ .orElse(0);
+ }
- // cleanup auth for the subsequent tests.
- remoteUgi.getTokenIdentifiers().remove(nmIdent);
+ private static Map<String, String> getFlinkConfig(final String host, final int port) throws Exception {
+ final ClusterConfigurationInfo clusterConfigurationInfoEntries = restClient.sendRequest(
+ host,
+ port,
+ ClusterConfigurationInfoHeaders.getInstance()).get();
- LOG.info("Finished testTaskManagerFailure()");
+ return clusterConfigurationInfoEntries.stream().collect(Collectors.toMap(
+ ClusterConfigurationInfoEntry::getKey,
+ ClusterConfigurationInfoEntry::getValue));
}
/**
@@ -620,11 +643,13 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
@SuppressWarnings("unchecked")
Set<String> applicationTags = (Set<String>) applicationTagsMethod.invoke(report);
- Assert.assertEquals(Collections.singleton("test-tag"), applicationTags);
+ assertEquals(Collections.singleton("test-tag"), applicationTags);
}
@After
public void checkForProhibitedLogContents() {
- ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
+ if (checkForProhibitedLogContents) {
+ ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
+ }
}
}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index b7b08ae..242588f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.commons.io.FileUtils;
@@ -81,6 +82,7 @@ import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -104,6 +106,8 @@ public abstract class YarnTestBase extends TestLogger {
protected static final int NUM_NODEMANAGERS = 2;
+ private static final long RETRY_TIMEOUT = 100L;
+
/** The tests are scanning for these strings in the final output. */
protected static final String[] PROHIBITED_STRINGS = {
"Exception", // we don't want any exceptions to happen
@@ -122,7 +126,11 @@ public abstract class YarnTestBase extends TestLogger {
"java.io.IOException: Connection reset by peer",
// this can happen in Akka 2.4 on shutdown.
- "java.util.concurrent.RejectedExecutionException: Worker has already been shutdown"
+ "java.util.concurrent.RejectedExecutionException: Worker has already been shutdown",
+
+ "org.apache.flink.util.FlinkException: Stopping JobMaster",
+ "org.apache.flink.util.FlinkException: JobManager is shutting down.",
+ "lost the leadership."
};
// Temp directory which is deleted after the unit test.
@@ -271,6 +279,16 @@ public abstract class YarnTestBase extends TestLogger {
return null;
}
+ protected static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout) throws Exception {
+ while (timeout.hasTimeLeft() && !condition.get()) {
+ Thread.sleep(Math.min(RETRY_TIMEOUT, timeout.timeLeft().toMillis()));
+ }
+
+ if (!timeout.hasTimeLeft()) {
+ throw new TimeoutException("Condition was not met in given timeout.");
+ }
+ }
+
@Nonnull
YarnClusterDescriptor createYarnClusterDescriptor(org.apache.flink.configuration.Configuration flinkConfiguration) {
final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
index f86d6e4..59bad7e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
@@ -23,6 +23,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
/**
* Testing job for {@link org.apache.flink.runtime.jobmaster.JobMaster} failover.
* Covering stream case that have a infinite source and a sink, scheduling by
@@ -30,10 +36,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
*/
public class YarnTestJob {
- public static JobGraph createJob() {
+ public static JobGraph stoppableJob(final StopJobSignal stopJobSignal) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.addSource(new InfiniteSourceFunction())
+ env.addSource(new InfiniteSourceFunction(stopJobSignal))
.setParallelism(2)
.shuffle()
.addSink(new DiscardingSink<>())
@@ -42,21 +48,61 @@ public class YarnTestJob {
return env.getStreamGraph().getJobGraph();
}
+ /**
+ * Helper class to signal between multiple processes that a job should stop.
+ */
+ public static class StopJobSignal implements Serializable {
+
+ private final String stopJobMarkerFile;
+
+ public static StopJobSignal usingMarkerFile(final Path stopJobMarkerFile) {
+ return new StopJobSignal(stopJobMarkerFile.toString());
+ }
+
+ private StopJobSignal(final String stopJobMarkerFile) {
+ this.stopJobMarkerFile = stopJobMarkerFile;
+ }
+
+ /**
+ * Signals that the job should stop.
+ */
+ public void signal() {
+ try {
+ Files.delete(Paths.get(stopJobMarkerFile));
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * True if job should stop.
+ */
+ public boolean isSignaled() {
+ return !Files.exists(Paths.get(stopJobMarkerFile));
+ }
+
+ }
+
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
private static final class InfiniteSourceFunction extends RichParallelSourceFunction<Integer> {
+
private static final long serialVersionUID = -8758033916372648233L;
+
private boolean running;
- InfiniteSourceFunction() {
- running = true;
+ private final StopJobSignal stopJobSignal;
+
+ InfiniteSourceFunction(final StopJobSignal stopJobSignal) {
+ this.running = true;
+ this.stopJobSignal = stopJobSignal;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
- while (running) {
+ while (running && !stopJobSignal.isSignaled()) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(0);
}