You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 14:35:00 UTC
[flink] 01/08: [FLINK-10401] Port ProcessFailureCancelingITCase to
new code base
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff97d1cb3efbac4f47abc0a38bac932a55fc1288
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 00:28:30 2018 +0200
[FLINK-10401] Port ProcessFailureCancelingITCase to new code base
This closes #6749.
---
.../DispatcherResourceManagerComponent.java | 10 +
.../flink/runtime/util/BlobServerResource.java | 4 +
...tractTaskManagerProcessFailureRecoveryTest.java | 29 +++
.../recovery/ProcessFailureCancelingITCase.java | 256 ++++++++++-----------
4 files changed, 159 insertions(+), 140 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2..b07095c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ public class DispatcherResourceManagerComponent<T extends Dispatcher> implements
return shutDownFuture;
}
+ @Nonnull
+ public T getDispatcher() {
+ return dispatcher;
+ }
+
+ @Nonnull
+ public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+ return webMonitorEndpoint;
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
if (isRunning.compareAndSet(true, false)) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf8..654b2bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ public class BlobServerResource extends ExternalResource {
public int getBlobServerPort() {
return blobServer.getPort();
}
+
+ public BlobServer getBlobServer() {
+ return blobServer;
+ }
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327ad..5cd6f30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -421,4 +422,32 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
}
}
+ /**
+ * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor.
+ */
+ public static class TaskExecutorProcessEntryPoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
+
+ public static void main(String[] args) {
+ try {
+ int jobManagerPort = Integer.parseInt(args[0]);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+ cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
+ cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+ cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+ cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+ cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+
+ TaskManagerRunner.runTaskManager(cfg, ResourceID.generate());
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to start TaskManager process", t);
+ System.exit(1);
+ }
+ }
+ }
+
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410..afca8f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,58 @@ package org.apache.flink.test.recovery;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
import org.junit.Test;
import java.io.File;
import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +81,36 @@ import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class ProcessFailureCancelingITCase extends TestLogger {
+ @Rule
+ public final BlobServerResource blobServerResource = new BlobServerResource();
+
@Test
public void testCancelingOnProcessFailure() throws Exception {
final StringWriter processOutput = new StringWriter();
+ final Time timeout = Time.minutes(2L);
- ActorSystem jmActorSystem = null;
+ RestClusterClient<String> clusterClient = null;
Process taskManagerProcess = null;
- HighAvailabilityServices highAvailabilityServices = null;
+ final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+ Configuration jmConfig = new Configuration();
+ jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+ jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+ jmConfig.setInteger(RestOptions.PORT, 0);
+
+ final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+ final int jobManagerPort = rpcService.getPort();
+ jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+ final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory(
+ StandaloneResourceManagerFactory.INSTANCE);
+ DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null;
+
+ try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ jmConfig,
+ TestingUtils.defaultExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
- try {
// check that we run this test only if the java command
// is available on this machine
String javaCommand = getJavaCommandPath();
@@ -96,36 +124,22 @@ public class ProcessFailureCancelingITCase extends TestLogger {
tempLogFile.deleteOnExit();
CommonTestUtils.printLog4jDebugConfig(tempLogFile);
- // find a free port to start the JobManager
- final int jobManagerPort = NetUtils.getAvailablePort();
-
- // start a JobManager
- Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
-
- Configuration jmConfig = new Configuration();
- jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
- jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s");
- jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
- jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
- jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
- jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
- jmConfig,
- TestingUtils.defaultExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
- jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(
+ dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(
jmConfig,
- jmActorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
+ rpcService,
+ haServices,
+ blobServerResource.getBlobServer(),
+ new HeartbeatServices(100L, 1000L),
NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- JobManager.class,
- MemoryArchivist.class)._1();
+ new MemoryArchivedExecutionGraphStore(),
+ fatalErrorHandler);
+
+ // update the rest ports
+ final int restPort = dispatcherResourceManagerComponent
+ .getWebMonitorEndpoint()
+ .getServerAddress()
+ .getPort();
+ jmConfig.setInteger(RestOptions.PORT, restPort);
// the TaskManager java command
String[] command = new String[] {
@@ -134,7 +148,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
"-Xms80m", "-Xmx80m",
"-classpath", getCurrentClasspath(),
- AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+ AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
String.valueOf(jobManagerPort)
};
@@ -142,21 +156,14 @@ public class ProcessFailureCancelingITCase extends TestLogger {
taskManagerProcess = new ProcessBuilder(command).start();
new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
- // we wait for the JobManager to have the two TaskManagers available
- // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes)
- waitUntilNumTaskManagersAreRegistered(jmActor, 1, 120000);
-
final Throwable[] errorRef = new Throwable[1];
- final Configuration configuration = new Configuration();
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
-
// start the test program, which infinitely blocks
Runnable programRunner = new Runnable() {
@Override
public void run() {
try {
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration);
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration());
env.setParallelism(2);
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
@@ -187,15 +194,30 @@ public class ProcessFailureCancelingITCase extends TestLogger {
Thread programThread = new Thread(programRunner);
// kill the TaskManager
+ programThread.start();
+
+ final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L));
+
+ final DispatcherGateway dispatcherGateway = rpcService.connect(
+ leaderConnectionInfo.getAddress(),
+ DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+ DispatcherGateway.class).get();
+
+ waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+ clusterClient = new RestClusterClient<>(jmConfig, "standalone");
+
+ final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout);
+
+ assertThat(jobIds, hasSize(1));
+ final JobID jobId = jobIds.iterator().next();
+
+ // kill the TaskManager after the job started to run
taskManagerProcess.destroy();
taskManagerProcess = null;
- // immediately submit the job. this should hit the case
- // where the JobManager still thinks it has the TaskManager and tries to send it tasks
- programThread.start();
-
// try to cancel the job
- cancelRunningJob(jmActor);
+ clusterClient.cancel(jobId);
// we should see a failure within reasonable time (10s is the ask timeout).
// since the CI environment is often slow, we conservatively give it up to 2 minutes,
@@ -223,88 +245,42 @@ public class ProcessFailureCancelingITCase extends TestLogger {
if (taskManagerProcess != null) {
taskManagerProcess.destroy();
}
- if (jmActorSystem != null) {
- jmActorSystem.shutdown();
- }
-
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
- }
- }
-
- private void cancelRunningJob(ActorRef jobManager) throws Exception {
- final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
- // try at most for 30 seconds
- final long deadline = System.currentTimeMillis() + 30000;
-
- JobID jobId = null;
-
- do {
- Future<Object> response = Patterns.ask(jobManager,
- JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
- Object result;
- try {
- result = Await.result(response, askTimeout);
+ if (clusterClient != null) {
+ clusterClient.shutdown();
}
- catch (Exception e) {
- throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+ if (dispatcherResourceManagerComponent != null) {
+ dispatcherResourceManagerComponent.close();
}
- if (result instanceof JobManagerMessages.RunningJobsStatus) {
-
- List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+ fatalErrorHandler.rethrowError();
- if (jobs.size() == 1) {
- jobId = jobs.get(0).getJobId();
- break;
- }
- }
- }
- while (System.currentTimeMillis() < deadline);
-
- if (jobId == null) {
- // we never found it running, must have failed already
- return;
+ RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
}
-
- // tell the JobManager to cancel the job
- jobManager.tell(
- new JobManagerMessages.LeaderSessionMessage(
- HighAvailabilityServices.DEFAULT_LEADER_ID,
- new JobManagerMessages.CancelJob(jobId)),
- ActorRef.noSender());
}
- private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay)
- throws Exception {
- final long deadline = System.currentTimeMillis() + maxDelay;
- while (true) {
- long remaining = deadline - System.currentTimeMillis();
- if (remaining <= 0) {
- fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)");
- }
-
- FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
+ private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
+ FutureUtils.retrySuccesfulWithDelay(
+ () -> dispatcherGateway.requestClusterOverview(timeout),
+ Time.milliseconds(50L),
+ Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+ clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 &&
+ clusterOverview.getNumSlotsAvailable() == 0 &&
+ clusterOverview.getNumSlotsTotal() == 2,
+ TestingUtils.defaultScheduledExecutor())
+ .get();
+ }
- try {
- Future<?> result = Patterns.ask(jobManager,
- JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new Timeout(timeout));
- Integer numTMs = (Integer) Await.result(result, timeout);
- if (numTMs == numExpected) {
- break;
- }
- }
- catch (TimeoutException e) {
- // ignore and retry
- }
- catch (ClassCastException e) {
- fail("Wrong response: " + e.getMessage());
- }
- }
+ private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+ return FutureUtils.retrySuccesfulWithDelay(
+ CheckedSupplier.unchecked(clusterClient::listJobs),
+ Time.milliseconds(50L),
+ Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+ jobs -> !jobs.isEmpty(),
+ TestingUtils.defaultScheduledExecutor())
+ .get()
+ .stream()
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
}
private void printProcessLog(String processName, String log) {