You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/14 10:51:44 UTC

[GitHub] dawidwys closed pull request #7093: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint

dawidwys closed pull request #7093:  [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint
URL: https://github.com/apache/flink/pull/7093
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 22244f6cb8d..b8dc5545706 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 public interface CheckpointCoordinatorGateway extends RpcGateway {
@@ -31,9 +32,5 @@ void acknowledgeCheckpoint(
 			final CheckpointMetrics checkpointMetrics,
 			final TaskStateSnapshot subtaskState);
 
-	void declineCheckpoint(
-			JobID jobID,
-			ExecutionAttemptID executionAttemptID,
-			long checkpointId,
-			Throwable cause);
+	void declineCheckpoint(DeclineCheckpoint declineCheckpoint);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5d2d363cf71..40a675aca31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -690,13 +690,7 @@ public void acknowledgeCheckpoint(
 
 	// TODO: This method needs a leader session ID
 	@Override
-	public void declineCheckpoint(
-			final JobID jobID,
-			final ExecutionAttemptID executionAttemptID,
-			final long checkpointID,
-			final Throwable reason) {
-		final DeclineCheckpoint decline = new DeclineCheckpoint(
-				jobID, executionAttemptID, checkpointID, reason);
+	public void declineCheckpoint(DeclineCheckpoint decline) {
 		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 
 		if (checkpointCoordinator != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index c90a8b5bbbc..2f656d09263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -87,6 +91,29 @@ public static void terminateRpcService(RpcService rpcService, Time timeout) thro
 		rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
+	/**
+	 * Shuts the given rpc services down and waits for their termination.
+	 *
+	 * @param rpcServices to shut down
+	 * @param timeout for this operation
+	 * @throws InterruptedException if the operation has been interrupted
+	 * @throws ExecutionException if a problem occurred
+	 * @throws TimeoutException if a timeout occurred
+	 */
+	public static void terminateRpcServices(
+			Time timeout,
+			RpcService... rpcServices) throws InterruptedException, ExecutionException, TimeoutException {
+		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(rpcServices.length);
+
+		for (RpcService service : rpcServices) {
+			if (service != null) {
+				terminationFutures.add(service.stopService());
+			}
+		}
+
+		FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
 	// We don't want this class to be instantiable
 	private RpcUtils() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index c8f7357ab7e..10c9e2f123a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
@@ -57,6 +58,9 @@ public void declineCheckpoint(
 			long checkpointId,
 			Throwable cause) {
 
-		checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause);
+		checkpointCoordinatorGateway.declineCheckpoint(new DeclineCheckpoint(jobID,
+			executionAttemptID,
+			checkpointId,
+			cause));
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f3c72545e69..b1f02845c14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -33,6 +33,7 @@
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -84,14 +85,17 @@
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
@@ -106,12 +110,15 @@
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.SupplierWithException;
 
+import akka.actor.ActorSystem;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.After;
@@ -130,6 +137,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -248,6 +256,75 @@ public static void teardownClass() {
 		}
 	}
 
+	@Test
+	public void testDeclineCheckpointInvocationWithUserException() throws Exception {
+		RpcService rpcService1 = null;
+		RpcService rpcService2 = null;
+		try {
+			final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+			final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+
+			rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout);
+			rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout);
+
+			final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>();
+
+			final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+			final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+			final JobMaster jobMaster = new JobMaster(
+				rpcService1,
+				jobMasterConfiguration,
+				jmResourceId,
+				jobGraph,
+				haServices,
+				DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
+				jobManagerSharedServices,
+				heartbeatServices,
+				blobServer,
+				UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+				new NoOpOnCompletionActions(),
+				testingFatalErrorHandler,
+				JobMasterTest.class.getClassLoader()) {
+				@Override
+				public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
+					declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
+				}
+			};
+
+			jobMaster.start(jobMasterId, testingTimeout).get();
+
+			final String className = "UserException";
+			final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava(
+				temporaryFolder.newFolder(),
+				className + ".java",
+				String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }",
+					className,
+					className));
+
+			Throwable userException = (Throwable) Class.forName(className, false, userClassLoader).newInstance();
+
+			CompletableFuture<JobMasterGateway> jobMasterGateway =
+				rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class);
+
+			jobMasterGateway.thenAccept(gateway -> {
+				gateway.declineCheckpoint(new DeclineCheckpoint(
+						jobGraph.getJobID(),
+						new ExecutionAttemptID(1, 1),
+						1,
+						userException
+					)
+				);
+			});
+
+			Throwable throwable = declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(),
+				TimeUnit.MILLISECONDS);
+			assertThat(throwable, instanceOf(SerializedThrowable.class));
+			assertThat(throwable.getMessage(), equalTo(userException.getMessage()));
+		} finally {
+			RpcUtils.terminateRpcServices(testingTimeout, rpcService1, rpcService2);
+		}
+	}
+
 	@Test
 	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
 		final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index c9c55a1da8f..f5f7f8e3415 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -144,7 +145,7 @@
 	private final Consumer<Tuple5<JobID, ExecutionAttemptID, Long, CheckpointMetrics, TaskStateSnapshot>> acknowledgeCheckpointConsumer;
 
 	@Nonnull
-	private final Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer;
+	private final Consumer<DeclineCheckpoint> declineCheckpointConsumer;
 
 	@Nonnull
 	private final Supplier<JobMasterId> fencingTokenSupplier;
@@ -183,7 +184,7 @@ public TestingJobMasterGateway(
 			@Nonnull Function<JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction,
 			@Nonnull BiConsumer<AllocationID, Throwable> notifyAllocationFailureConsumer,
 			@Nonnull Consumer<Tuple5<JobID, ExecutionAttemptID, Long, CheckpointMetrics, TaskStateSnapshot>> acknowledgeCheckpointConsumer,
-			@Nonnull Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer,
+			@Nonnull Consumer<DeclineCheckpoint> declineCheckpointConsumer,
 			@Nonnull Supplier<JobMasterId> fencingTokenSupplier,
 			@Nonnull BiFunction<JobID, String, CompletableFuture<KvStateLocation>> requestKvStateLocationFunction,
 			@Nonnull Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction,
@@ -335,8 +336,8 @@ public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttem
 	}
 
 	@Override
-	public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, Throwable cause) {
-		declineCheckpointConsumer.accept(Tuple4.of(jobID, executionAttemptID, checkpointId, cause));
+	public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
+		declineCheckpointConsumer.accept(declineCheckpoint);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index e40b752f248..b52df9ee052 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -40,6 +40,7 @@
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -96,7 +97,7 @@
 	private Function<JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction = ignored -> CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(null));
 	private BiConsumer<AllocationID, Throwable> notifyAllocationFailureConsumer = (ignoredA, ignoredB) -> {};
 	private Consumer<Tuple5<JobID, ExecutionAttemptID, Long, CheckpointMetrics, TaskStateSnapshot>> acknowledgeCheckpointConsumer = ignored -> {};
-	private Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer = ignored -> {};
+	private Consumer<DeclineCheckpoint> declineCheckpointConsumer = ignored -> {};
 	private Supplier<JobMasterId> fencingTokenSupplier = () -> JOB_MASTER_ID;
 	private BiFunction<JobID, String, CompletableFuture<KvStateLocation>> requestKvStateLocationFunction = (ignoredA, registrationName) -> FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
 	private Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get());
@@ -222,7 +223,7 @@ public TestingJobMasterGatewayBuilder setAcknowledgeCheckpointConsumer(Consumer<
 		return this;
 	}
 
-	public TestingJobMasterGatewayBuilder setDeclineCheckpointConsumer(Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer) {
+	public TestingJobMasterGatewayBuilder setDeclineCheckpointConsumer(Consumer<DeclineCheckpoint> declineCheckpointConsumer) {
 		this.declineCheckpointConsumer = declineCheckpointConsumer;
 		return this;
 	}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services