You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2018/02/27 13:05:58 UTC
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/5589
[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource
Updated version of #5579
@tillrohrmann I addressed your comments. Unfortunately, this doesn't work for FLIP-6 as well because `cancelWithSavepoint()` is not yet implemented. cc @GJL
cc @zentol @kl0u
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink jira-8757-port-it-cases-flip-6
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5589.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5589
----
commit e09c226a0f546682baca1c64a0ccfc81e3fe4ed5
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-26T09:12:44Z
[FLINK-8758] Add getters to JobDetailsInfo
commit 88012906d04779d3d1a57a892d75c4338a7c9577
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-26T11:28:59Z
[hotfix] Fix stoppable field in JobDetailsInfo
For some reason, the server was sending JSON with a stoppable field, not
the isStoppable field.
commit 614f24f59510d928a44e0ea7db174977cfc088ac
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-26T11:29:23Z
Add proper toString() on JsonResponse in RestClient
commit ae117d22e3d32b328bb2ae195b5dfb3b3f722708
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-26T10:44:57Z
[FLINK-8757] Add MiniClusterResource.getClusterClient()
commit e3a27297ba8ae34807c86a325ec3d79dadd49a73
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-26T10:52:50Z
[FLINK-8758] Make non-blocking ClusterClient.submitJob() public
commit a30c6557c41ac3fbf78a4e1971cf881deb770fb7
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-26T10:53:47Z
[FLINK-8758] Add ClusterClient.getJobStatus()
commit 544cf20c1689bbd095074027612e7b9da83cc772
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-27T12:40:51Z
[FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay()
This retries getting a result until it matches a given predicate or
until we run out of retries.
commit de7f8d930da52ff18f3edaf9a550970bd1cff66c
Author: Aljoscha Krettek <al...@...>
Date: 2018-02-27T12:42:09Z
[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource
----
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170917656
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
@@ -165,6 +165,54 @@ public int hashCode() {
return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
}
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStoppable() {
--- End diff --
see `ResponseBody` javadocs:
```
When adding methods that are prefixed with {@code get} make sure to annotate them with {@code @JsonIgnore}.
```
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170932875
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
- String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+ jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
+ TestingUtils.defaultScheduledExecutor());
- // Wait until canceled
- msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
- Await.ready(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
return savepointPath;
}
- private void restoreJob(String savepointPath) throws Exception {
+ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
- Object msg;
- Object result;
+ assertNotNull(jobToRestore.getJobID());
--- End diff --
it can't now, but better be save than sorry 😅
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:
https://github.com/apache/flink/pull/5589
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171229327
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ retrySuccessfulOperationWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ retryPredicate,
+ scheduledExecutor);
+
+ return resultFuture;
+ }
+
+ private static <T> void retrySuccessfulOperationWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture = operation.get();
+
+ operationResultFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+ } else {
+ resultFuture.completeExceptionally(throwable);
+ }
+ } else {
+ if (retries > 0 && !retryPredicate.test(t)) {
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
+ retryDelay.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ resultFuture.whenComplete(
+ (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+ resultFuture.complete(t);
--- End diff --
We can change it to throw an exception if the predicate doesn't match on the final try.
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170934128
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
- String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+ jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
+ TestingUtils.defaultScheduledExecutor());
- // Wait until canceled
- msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
- Await.ready(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
return savepointPath;
}
- private void restoreJob(String savepointPath) throws Exception {
+ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
- Object msg;
- Object result;
+ assertNotNull(jobToRestore.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToRestore, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture =
+ clusterClient.getJobStatus(jobToRestore.getJobID());
+
+ while (deadline.hasTimeLeft()
--- End diff --
dammit, I had this fixed but didn't commit it...
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171224289
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
--- End diff --
we could make this a predicate, which would also allow something like `() -> deadline.hasTimeLeft()`.
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170917763
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
@@ -165,6 +165,54 @@ public int hashCode() {
return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
}
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStoppable() {
--- End diff --
https://stackoverflow.com/questions/32270422/jackson-renames-primitive-boolean-field-by-removing-is
---
[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/5589
@zentol & @tillrohrmann I pushed some updates
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170935501
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
- String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+ jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
+ TestingUtils.defaultScheduledExecutor());
- // Wait until canceled
- msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
- Await.ready(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
return savepointPath;
}
- private void restoreJob(String savepointPath) throws Exception {
+ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
- Object msg;
- Object result;
+ assertNotNull(jobToRestore.getJobID());
--- End diff --
well...if we go down that route you'd have to check every return value. For example, why aren't you checking that `createJobGraph` doesn't return null?
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170915682
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
--- End diff --
add error message that the savepoint could not be taken
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170938595
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,77 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry in case of failures
--- End diff --
"failures" is misleading in this context
---
[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/5589
merged
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170915918
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
- String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+ jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
+ TestingUtils.defaultScheduledExecutor());
- // Wait until canceled
- msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
- Await.ready(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
return savepointPath;
}
- private void restoreJob(String savepointPath) throws Exception {
+ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
- Object msg;
- Object result;
+ assertNotNull(jobToRestore.getJobID());
--- End diff --
this cannot happen afaik.
---
[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/5589
@zentol We can only merge the commits up until the actual port, since that test doesn't yet work on FLIP-6, btw.
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170917538
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
@@ -165,6 +165,54 @@ public int hashCode() {
return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
}
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStoppable() {
--- End diff --
can you try annotating this one with `@JsonIgnore` and seeing what is sent?
Jackson analyzes getters and this may mess with the resulting JSON.
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171226341
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ retrySuccessfulOperationWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ retryPredicate,
+ scheduledExecutor);
+
+ return resultFuture;
+ }
+
+ private static <T> void retrySuccessfulOperationWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture = operation.get();
+
+ operationResultFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+ } else {
+ resultFuture.completeExceptionally(throwable);
+ }
+ } else {
+ if (retries > 0 && !retryPredicate.test(t)) {
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
+ retryDelay.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ resultFuture.whenComplete(
+ (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+ resultFuture.complete(t);
--- End diff --
So if the retries are exhausted we submit the last returned operation result?
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170937921
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
@@ -165,6 +165,55 @@ public int hashCode() {
return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
}
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty(FIELD_NAME_IS_STOPPABLE)
--- End diff --
you should do this for all getters, as otherwise we can run into the same issue should we ever rename the field.
---
[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/5589
Rebased on more time
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170916096
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
- String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+ jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
+ TestingUtils.defaultScheduledExecutor());
- // Wait until canceled
- msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
- Await.ready(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
return savepointPath;
}
- private void restoreJob(String savepointPath) throws Exception {
+ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
- Object msg;
- Object result;
+ assertNotNull(jobToRestore.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToRestore, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture =
+ clusterClient.getJobStatus(jobToRestore.getJobID());
+
+ while (deadline.hasTimeLeft()
--- End diff --
use `FutureUtils.retrySuccesfulWithDelay` instead?
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170937261
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
@@ -165,6 +165,54 @@ public int hashCode() {
return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
}
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStoppable() {
--- End diff --
nice! I fixed it
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171226593
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
--- End diff --
or just accept a deadline instead
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171225281
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
--- End diff --
the javadocs should say what happens if the number of retries is exhausted
---
[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/5589
@zentol rebased
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170932268
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -79,16 +62,20 @@
*/
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
+ private static final int NUM_TMS = 2;
--- End diff --
fixing
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170915210
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -79,16 +62,20 @@
*/
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
+ private static final int NUM_TMS = 2;
--- End diff --
the existing test only uses 1 task manager.
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r170932906
--- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
@@ -104,175 +91,97 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}
- @BeforeClass
- public static void setupCluster() throws Exception {
- final Configuration configuration = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
- actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
- highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- TestingUtils.defaultExecutor());
-
- Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
- configuration,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- actorSystem,
- timeout);
-
- archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
- ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
- // Wait until connected
- Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- }
-
- if (actorSystem != null) {
- actorSystem.shutdown();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (jobManager != null) {
- jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
- taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
+ ClassLoader classLoader = this.getClass().getClassLoader();
+ ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
+ clusterClient.setDetached(true);
+ final Deadline deadline = TEST_TIMEOUT.fromNow();
+
// submit job with old version savepoint and create a migrated savepoint in the new version
- String savepointPath = migrateJob();
+ String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
- restoreJob(savepointPath);
+ restoreJob(classLoader, clusterClient, deadline, savepointPath);
}
- private String migrateJob() throws Throwable {
+ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
+
URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
- Object msg;
- Object result;
+ assertNotNull(jobToMigrate.getJobID());
- // Submit job graph
- msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
- result = Await.result(jobManager.ask(msg, timeout), timeout);
+ clusterClient.submitJob(jobToMigrate, classLoader);
- if (result instanceof JobManagerMessages.JobResultFailure) {
- JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
- throw new Exception(failure.cause());
- }
- Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+ CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+ () -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+ deadline.timeLeft().toMillis() / 50,
+ Time.milliseconds(50),
+ (jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
+ TestingUtils.defaultScheduledExecutor());
- // Wait for all tasks to be running
- msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
+ assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
- msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+ String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
- boolean retry = true;
- for (int i = 0; retry && i < 10; i++) {
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
-
- if (result instanceof JobManagerMessages.CancellationFailure) {
- Thread.sleep(50L);
- } else {
- retry = false;
+ while (deadline.hasTimeLeft() && savepointPath == null) {
+ try {
+ savepointPath = clusterClient.cancelWithSavepoint(
+ jobToMigrate.getJobID(),
+ targetDirectory.getAbsolutePath());
+ } catch (Exception e) {
+ if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
+ throw e;
+ }
}
}
- if (result instanceof JobManagerMessages.CancellationFailure) {
- JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
- throw new Exception(failure.cause());
- }
+ assertNotNull(savepointPath);
--- End diff --
fixing
---
[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171228120
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ retrySuccessfulOperationWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ retryPredicate,
+ scheduledExecutor);
+
+ return resultFuture;
+ }
+
+ private static <T> void retrySuccessfulOperationWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture = operation.get();
+
+ operationResultFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+ } else {
+ resultFuture.completeExceptionally(throwable);
+ }
+ } else {
+ if (retries > 0 && !retryPredicate.test(t)) {
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
+ retryDelay.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ resultFuture.whenComplete(
+ (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+ resultFuture.complete(t);
--- End diff --
This seems rather unfortunate. In the code sample below I wait for a job to be RUNNING. I can only replace the loop, but not the final check which really limits the userfulness:
```
JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
================== Comparison ===============================
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
() -> client.getJobStatus(jobSubmissionResult.getJobID()),
submissionDeadLine.timeLeft().toMillis() / 50, // crappy retry count calculation
Time.milliseconds(50),
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor()
);
if (jobStatusFuture.get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS) != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
```
In it's current form this doesn't really provide value imo. (yes it's asynchronous, but in which tests do really even we need that?)
---