You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/07 13:42:37 UTC

[flink] branch release-1.5 updated (a1cc687 -> d1c77f9)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a1cc687  [FLINK-10261] [table] Fix INSERT INTO with ORDER BY clause
     new 95b198d  [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.
     new d1c77f9  [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  3 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 58 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)


[flink] 02/02: [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1c77f9686c36bf09939efd9bbf07ac554b41902
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:29:14 2018 +0200

    [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.
---
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 58 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d..bc073c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ public interface JobMasterGateway extends
 	CompletableFuture<String> triggerSavepoint(
 		@Nullable final String targetDirectory,
 		final boolean cancelJob,
-		final Time timeout);
+		@RpcTimeout final Time timeout);
 
 	/**
 	 * Requests the statistics on operator back pressure.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d7dc017..b231b04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -80,6 +80,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -93,6 +94,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -103,13 +105,17 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link JobMaster}.
@@ -703,6 +709,58 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+	 * is respected.
+	 */
+	@Test
+	public void testTriggerSavepointTimeout() throws Exception {
+		final JobMaster jobMaster = new JobMaster(
+			rpcService,
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jmResourceId,
+			jobGraph,
+			haServices,
+			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices,
+			blobServer,
+			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			new NoOpOnCompletionActions(),
+			testingFatalErrorHandler,
+			JobMasterTest.class.getClassLoader()) {
+
+			@Override
+			public CompletableFuture<String> triggerSavepoint(
+					@Nullable final String targetDirectory,
+					final boolean cancelJob,
+					final Time timeout) {
+				return new CompletableFuture<>();
+			}
+		};
+
+		try {
+			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+			final CompletableFuture<String> savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds(1));
+			final CompletableFuture<String> savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
+
+			try {
+				savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
+				fail();
+			} catch (final ExecutionException e) {
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+			}
+
+			assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private JobGraph producerConsumerJobGraph() {
 		final JobVertex producer = new JobVertex("Producer");
 		producer.setInvokableClass(NoOpInvokable.class);


[flink] 01/02: [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95b198d31cb6a84d69fd8d7f5854f5d7f766e7b0
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:27:54 2018 +0200

    [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6caa301..8b34a78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -94,7 +94,6 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPre
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
@@ -908,7 +907,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
 		final ExecutionGraph currentExecutionGraph = executionGraph;
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
 	}