You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/07 13:41:45 UTC

[flink] branch release-1.6 updated (d8cc768 -> 5d93520)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d8cc768  [FLINK-10293][streaming] Properly forward REST port for remote environments
     new f20315b  [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.
     new 5d93520  [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  3 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 58 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)


[flink] 01/02: [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f20315b9d2f92e77a28ec9979e33d25c97c3d491
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:27:54 2018 +0200

    [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd..80a1d9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -95,7 +95,6 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPre
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
@@ -909,7 +908,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
 		final ExecutionGraph currentExecutionGraph = executionGraph;
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
 	}


[flink] 02/02: [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d93520ae2a051f141cc5e92cf73411ecbde4390
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:29:14 2018 +0200

    [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.
---
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 58 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d..bc073c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ public interface JobMasterGateway extends
 	CompletableFuture<String> triggerSavepoint(
 		@Nullable final String targetDirectory,
 		final boolean cancelJob,
-		final Time timeout);
+		@RpcTimeout final Time timeout);
 
 	/**
 	 * Requests the statistics on operator back pressure.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 82fdc94..86afa90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -96,6 +97,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -106,15 +108,19 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link JobMaster}.
@@ -744,6 +750,58 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+	 * is respected.
+	 */
+	@Test
+	public void testTriggerSavepointTimeout() throws Exception {
+		final JobMaster jobMaster = new JobMaster(
+			rpcService,
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jmResourceId,
+			jobGraph,
+			haServices,
+			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices,
+			blobServer,
+			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			new NoOpOnCompletionActions(),
+			testingFatalErrorHandler,
+			JobMasterTest.class.getClassLoader()) {
+
+			@Override
+			public CompletableFuture<String> triggerSavepoint(
+					@Nullable final String targetDirectory,
+					final boolean cancelJob,
+					final Time timeout) {
+				return new CompletableFuture<>();
+			}
+		};
+
+		try {
+			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+			final CompletableFuture<String> savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds(1));
+			final CompletableFuture<String> savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
+
+			try {
+				savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
+				fail();
+			} catch (final ExecutionException e) {
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+			}
+
+			assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private JobGraph producerConsumerJobGraph() {
 		final JobVertex producer = new JobVertex("Producer");
 		producer.setInvokableClass(NoOpInvokable.class);