You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2018/02/27 13:05:58 UTC

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/5589

    [FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource

    Updated version of #5579 
    
    @tillrohrmann I addressed your comments. Unfortunately, this doesn't work for FLIP-6 as well because `cancelWithSavepoint()` is not yet implemented. cc @GJL 
    
    cc @zentol @kl0u 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink jira-8757-port-it-cases-flip-6

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5589.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5589
    
----
commit e09c226a0f546682baca1c64a0ccfc81e3fe4ed5
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-26T09:12:44Z

    [FLINK-8758] Add getters to JobDetailsInfo

commit 88012906d04779d3d1a57a892d75c4338a7c9577
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-26T11:28:59Z

    [hotfix] Fix stoppable field in JobDetailsInfo
    
    For some reason, the server was sending JSON with a stoppable field, not
    the isStoppable field.

commit 614f24f59510d928a44e0ea7db174977cfc088ac
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-26T11:29:23Z

    Add proper toString() on JsonResponse in RestClient

commit ae117d22e3d32b328bb2ae195b5dfb3b3f722708
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-26T10:44:57Z

    [FLINK-8757] Add MiniClusterResource.getClusterClient()

commit e3a27297ba8ae34807c86a325ec3d79dadd49a73
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-26T10:52:50Z

    [FLINK-8758] Make non-blocking ClusterClient.submitJob() public

commit a30c6557c41ac3fbf78a4e1971cf881deb770fb7
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-26T10:53:47Z

    [FLINK-8758] Add ClusterClient.getJobStatus()

commit 544cf20c1689bbd095074027612e7b9da83cc772
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-27T12:40:51Z

    [FLINK-8758] Add FutureUtils.retrySuccessfulWithDelay()
    
    This retries getting a result until it matches a given predicate or
    until we run out of retries.

commit de7f8d930da52ff18f3edaf9a550970bd1cff66c
Author: Aljoscha Krettek <al...@...>
Date:   2018-02-27T12:42:09Z

    [FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource

----


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170917656
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
    @@ -165,6 +165,54 @@ public int hashCode() {
     		return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
     	}
     
    +	public JobID getJobId() {
    +		return jobId;
    +	}
    +
    +	public String getName() {
    +		return name;
    +	}
    +
    +	public boolean isStoppable() {
    --- End diff --
    
    see `ResponseBody` javadocs:
    ```
    When adding methods that are prefixed with {@code get} make sure to annotate them with {@code @JsonIgnore}.
    ```


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170932875
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
     
    -		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait until canceled
    -		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
    -		Await.ready(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		return savepointPath;
     	}
     
    -	private void restoreJob(String savepointPath) throws Exception {
    +	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
     		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
     		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToRestore.getJobID());
    --- End diff --
    
    it can't now, but better be save than sorry 😅 


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/5589


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171229327
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,78 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry
    +	 *   in case the predicate isn't matched
    +	 */
    +	public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    +		final Time retryDelay,
    +		final Predicate<T> retryPredicate,
    +		final ScheduledExecutor scheduledExecutor) {
    +
    +		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
    +
    +		retrySuccessfulOperationWithDelay(
    +			resultFuture,
    +			operation,
    +			retries,
    +			retryDelay,
    +			retryPredicate,
    +			scheduledExecutor);
    +
    +		return resultFuture;
    +	}
    +
    +	private static <T> void retrySuccessfulOperationWithDelay(
    +		final CompletableFuture<T> resultFuture,
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    +		final Time retryDelay,
    +		final Predicate<T> retryPredicate,
    +		final ScheduledExecutor scheduledExecutor) {
    +
    +		if (!resultFuture.isDone()) {
    +			final CompletableFuture<T> operationResultFuture = operation.get();
    +
    +			operationResultFuture.whenComplete(
    +				(t, throwable) -> {
    +					if (throwable != null) {
    +						if (throwable instanceof CancellationException) {
    +							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
    +						} else {
    +							resultFuture.completeExceptionally(throwable);
    +						}
    +					} else {
    +						if (retries > 0 && !retryPredicate.test(t)) {
    +							final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    +								() -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
    +								retryDelay.toMilliseconds(),
    +								TimeUnit.MILLISECONDS);
    +
    +							resultFuture.whenComplete(
    +								(innerT, innerThrowable) -> scheduledFuture.cancel(false));
    +						} else {
    +							resultFuture.complete(t);
    --- End diff --
    
    We can change it to throw an exception if the predicate doesn't match on the final try.



---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170934128
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
     
    -		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait until canceled
    -		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
    -		Await.ready(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		return savepointPath;
     	}
     
    -	private void restoreJob(String savepointPath) throws Exception {
    +	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
     		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
     		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToRestore.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToRestore, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture =
    +			clusterClient.getJobStatus(jobToRestore.getJobID());
    +
    +		while (deadline.hasTimeLeft()
    --- End diff --
    
    dammit, I had this fixed but didn't commit it...


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171224289
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,78 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry
    +	 *   in case the predicate isn't matched
    +	 */
    +	public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    --- End diff --
    
    we could make this a predicate, which would also allow something like `() -> deadline.hasTimeLeft()`.


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170917763
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
    @@ -165,6 +165,54 @@ public int hashCode() {
     		return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
     	}
     
    +	public JobID getJobId() {
    +		return jobId;
    +	}
    +
    +	public String getName() {
    +		return name;
    +	}
    +
    +	public boolean isStoppable() {
    --- End diff --
    
    https://stackoverflow.com/questions/32270422/jackson-renames-primitive-boolean-field-by-removing-is


---

[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5589
  
    @zentol & @tillrohrmann I pushed some updates


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170935501
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
     
    -		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait until canceled
    -		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
    -		Await.ready(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		return savepointPath;
     	}
     
    -	private void restoreJob(String savepointPath) throws Exception {
    +	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
     		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
     		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToRestore.getJobID());
    --- End diff --
    
    well...if we go down that route you'd have to check every return value. For example, why aren't you checking that `createJobGraph` doesn't return null?


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170915682
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
    --- End diff --
    
    add error message that the savepoint could not be taken


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170938595
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,77 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry in case of failures
    --- End diff --
    
    "failures" is misleading in this context


---

[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5589
  
    merged


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170915918
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
     
    -		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait until canceled
    -		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
    -		Await.ready(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		return savepointPath;
     	}
     
    -	private void restoreJob(String savepointPath) throws Exception {
    +	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
     		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
     		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToRestore.getJobID());
    --- End diff --
    
    this cannot happen afaik.


---

[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5589
  
    @zentol We can only merge the commits up until the actual port, since that test doesn't yet work on FLIP-6, btw.


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170917538
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
    @@ -165,6 +165,54 @@ public int hashCode() {
     		return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
     	}
     
    +	public JobID getJobId() {
    +		return jobId;
    +	}
    +
    +	public String getName() {
    +		return name;
    +	}
    +
    +	public boolean isStoppable() {
    --- End diff --
    
    can you try annotating this one with `@JsonIgnore` and seeing what is sent?
    
    Jackson analyzes getters and this may mess with the resulting JSON.


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171226341
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,78 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry
    +	 *   in case the predicate isn't matched
    +	 */
    +	public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    +		final Time retryDelay,
    +		final Predicate<T> retryPredicate,
    +		final ScheduledExecutor scheduledExecutor) {
    +
    +		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
    +
    +		retrySuccessfulOperationWithDelay(
    +			resultFuture,
    +			operation,
    +			retries,
    +			retryDelay,
    +			retryPredicate,
    +			scheduledExecutor);
    +
    +		return resultFuture;
    +	}
    +
    +	private static <T> void retrySuccessfulOperationWithDelay(
    +		final CompletableFuture<T> resultFuture,
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    +		final Time retryDelay,
    +		final Predicate<T> retryPredicate,
    +		final ScheduledExecutor scheduledExecutor) {
    +
    +		if (!resultFuture.isDone()) {
    +			final CompletableFuture<T> operationResultFuture = operation.get();
    +
    +			operationResultFuture.whenComplete(
    +				(t, throwable) -> {
    +					if (throwable != null) {
    +						if (throwable instanceof CancellationException) {
    +							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
    +						} else {
    +							resultFuture.completeExceptionally(throwable);
    +						}
    +					} else {
    +						if (retries > 0 && !retryPredicate.test(t)) {
    +							final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    +								() -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
    +								retryDelay.toMilliseconds(),
    +								TimeUnit.MILLISECONDS);
    +
    +							resultFuture.whenComplete(
    +								(innerT, innerThrowable) -> scheduledFuture.cancel(false));
    +						} else {
    +							resultFuture.complete(t);
    --- End diff --
    
    So if the retries are exhausted we submit the last returned operation result?


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170937921
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
    @@ -165,6 +165,55 @@ public int hashCode() {
     		return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
     	}
     
    +	public JobID getJobId() {
    +		return jobId;
    +	}
    +
    +	public String getName() {
    +		return name;
    +	}
    +
    +	@JsonProperty(FIELD_NAME_IS_STOPPABLE)
    --- End diff --
    
    you should do this for all getters, as otherwise we can run into the same issue should we ever rename the field.


---

[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5589
  
    Rebased on more time


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170916096
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
     
    -		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait until canceled
    -		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
    -		Await.ready(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		return savepointPath;
     	}
     
    -	private void restoreJob(String savepointPath) throws Exception {
    +	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
     		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
     		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToRestore.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToRestore, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture =
    +			clusterClient.getJobStatus(jobToRestore.getJobID());
    +
    +		while (deadline.hasTimeLeft()
    --- End diff --
    
    use `FutureUtils.retrySuccesfulWithDelay` instead?


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170937261
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java ---
    @@ -165,6 +165,54 @@ public int hashCode() {
     		return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
     	}
     
    +	public JobID getJobId() {
    +		return jobId;
    +	}
    +
    +	public String getName() {
    +		return name;
    +	}
    +
    +	public boolean isStoppable() {
    --- End diff --
    
    nice! I fixed it


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171226593
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,78 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry
    +	 *   in case the predicate isn't matched
    +	 */
    +	public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    --- End diff --
    
    or just accept a deadline instead


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171225281
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,78 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry
    +	 *   in case the predicate isn't matched
    +	 */
    +	public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    --- End diff --
    
    the javadocs should say what happens if the number of retries is exhausted


---

[GitHub] flink issue #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase to Mini...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5589
  
    @zentol rebased


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170932268
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -79,16 +62,20 @@
      */
     public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
     
    +	private static final int NUM_TMS = 2;
    --- End diff --
    
    fixing


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170915210
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -79,16 +62,20 @@
      */
     public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
     
    +	private static final int NUM_TMS = 2;
    --- End diff --
    
    the existing test only uses 1 task manager.


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170932906
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
    --- End diff --
    
    fixing


---

[GitHub] flink pull request #5589: [FLINK-8797] Port AbstractOperatorRestoreTestBase ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171228120
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -223,6 +223,78 @@
     		}
     	}
     
    +	/**
    +	 * Retry the given operation with the given delay in between successful completions where the
    +	 * result does not match a given predicate.
    +	 *
    +	 * @param operation to retry
    +	 * @param retries number of retries
    +	 * @param retryDelay delay between retries
    +	 * @param retryPredicate Predicate to test whether the result is acceptable
    +	 * @param scheduledExecutor executor to be used for the retry operation
    +	 * @param <T> type of the result
    +	 * @return Future which retries the given operation a given amount of times and delays the retry
    +	 *   in case the predicate isn't matched
    +	 */
    +	public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    +		final Time retryDelay,
    +		final Predicate<T> retryPredicate,
    +		final ScheduledExecutor scheduledExecutor) {
    +
    +		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
    +
    +		retrySuccessfulOperationWithDelay(
    +			resultFuture,
    +			operation,
    +			retries,
    +			retryDelay,
    +			retryPredicate,
    +			scheduledExecutor);
    +
    +		return resultFuture;
    +	}
    +
    +	private static <T> void retrySuccessfulOperationWithDelay(
    +		final CompletableFuture<T> resultFuture,
    +		final Supplier<CompletableFuture<T>> operation,
    +		final long retries,
    +		final Time retryDelay,
    +		final Predicate<T> retryPredicate,
    +		final ScheduledExecutor scheduledExecutor) {
    +
    +		if (!resultFuture.isDone()) {
    +			final CompletableFuture<T> operationResultFuture = operation.get();
    +
    +			operationResultFuture.whenComplete(
    +				(t, throwable) -> {
    +					if (throwable != null) {
    +						if (throwable instanceof CancellationException) {
    +							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
    +						} else {
    +							resultFuture.completeExceptionally(throwable);
    +						}
    +					} else {
    +						if (retries > 0 && !retryPredicate.test(t)) {
    +							final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    +								() -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
    +								retryDelay.toMilliseconds(),
    +								TimeUnit.MILLISECONDS);
    +
    +							resultFuture.whenComplete(
    +								(innerT, innerThrowable) -> scheduledFuture.cancel(false));
    +						} else {
    +							resultFuture.complete(t);
    --- End diff --
    
    This seems rather unfortunate. In the code sample below I wait for a job to be RUNNING. I can only replace the loop, but not the final check which really limits the userfulness:
    ```
    JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
    	Thread.sleep(50);
    	jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    }
    
    if (jobStatus != JobStatus.RUNNING) {
    	Assert.fail("Job not in state RUNNING.");
    }
    
    ================== Comparison ===============================
    
    CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    	() -> client.getJobStatus(jobSubmissionResult.getJobID()),
    	submissionDeadLine.timeLeft().toMillis() / 50, // crappy retry count calculation
    	Time.milliseconds(50),
    	status -> status == JobStatus.RUNNING,
    	TestingUtils.defaultScheduledExecutor()
    );
    
    if (jobStatusFuture.get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS) != JobStatus.RUNNING) {
    	Assert.fail("Job not in state RUNNING.");
    }
    ```
    
    In it's current form this doesn't really provide value imo. (yes it's asynchronous, but in which tests do really even we need that?)


---