You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/07 13:41:47 UTC

[flink] 02/02: [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d93520ae2a051f141cc5e92cf73411ecbde4390
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:29:14 2018 +0200

    [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.
---
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 58 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d..bc073c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ public interface JobMasterGateway extends
 	CompletableFuture<String> triggerSavepoint(
 		@Nullable final String targetDirectory,
 		final boolean cancelJob,
-		final Time timeout);
+		@RpcTimeout final Time timeout);
 
 	/**
 	 * Requests the statistics on operator back pressure.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 82fdc94..86afa90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -96,6 +97,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -106,15 +108,19 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link JobMaster}.
@@ -744,6 +750,58 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+	 * is respected.
+	 */
+	@Test
+	public void testTriggerSavepointTimeout() throws Exception {
+		final JobMaster jobMaster = new JobMaster(
+			rpcService,
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jmResourceId,
+			jobGraph,
+			haServices,
+			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices,
+			blobServer,
+			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			new NoOpOnCompletionActions(),
+			testingFatalErrorHandler,
+			JobMasterTest.class.getClassLoader()) {
+
+			@Override
+			public CompletableFuture<String> triggerSavepoint(
+					@Nullable final String targetDirectory,
+					final boolean cancelJob,
+					final Time timeout) {
+				return new CompletableFuture<>();
+			}
+		};
+
+		try {
+			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+			final CompletableFuture<String> savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds(1));
+			final CompletableFuture<String> savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
+
+			try {
+				savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
+				fail();
+			} catch (final ExecutionException e) {
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+			}
+
+			assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private JobGraph producerConsumerJobGraph() {
 		final JobVertex producer = new JobVertex("Producer");
 		producer.setInvokableClass(NoOpInvokable.class);