You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/07 13:39:50 UTC
[flink] 02/04: [FLINK-10193][runtime] Add @RpcTimeout to
JobMasterGateway.triggerSavepoint.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bfbcd908bf9b59f64ddf8c783293addc098ee516
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:29:14 2018 +0200
[FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.
This closes #6601.
---
.../flink/runtime/jobmaster/JobMasterGateway.java | 2 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 58 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d..bc073c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ public interface JobMasterGateway extends
CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory,
final boolean cancelJob,
- final Time timeout);
+ @RpcTimeout final Time timeout);
/**
* Requests the statistics on operator back pressure.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 66ca769..d70cc31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -92,6 +92,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
@@ -108,6 +109,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.FileOutputStream;
@@ -122,17 +124,21 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for {@link JobMaster}.
@@ -917,6 +923,58 @@ public class JobMasterTest extends TestLogger {
}
}
+ /**
+ * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+ * is respected.
+ */
+ @Test
+ public void testTriggerSavepointTimeout() throws Exception {
+ final JobMaster jobMaster = new JobMaster(
+ rpcService,
+ JobMasterConfiguration.fromConfiguration(configuration),
+ jmResourceId,
+ jobGraph,
+ haServices,
+ DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices,
+ blobServer,
+ UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+ new NoOpOnCompletionActions(),
+ testingFatalErrorHandler,
+ JobMasterTest.class.getClassLoader()) {
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ @Nullable final String targetDirectory,
+ final boolean cancelJob,
+ final Time timeout) {
+ return new CompletableFuture<>();
+ }
+ };
+
+ try {
+ final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+ startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+ final CompletableFuture<String> savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds(1));
+ final CompletableFuture<String> savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
+
+ try {
+ savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
+ fail();
+ } catch (final ExecutionException e) {
+ final Throwable cause = ExceptionUtils.stripExecutionException(e);
+ assertThat(cause, instanceOf(TimeoutException.class));
+ }
+
+ assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+ }
+ }
+
private JobGraph producerConsumerJobGraph() {
final JobVertex producer = new JobVertex("Producer");
producer.setInvokableClass(NoOpInvokable.class);