You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/07 13:39:48 UTC

[flink] branch master updated (b0522e3 -> 1a5dea6)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b0522e3  [FLINK-10242][metrics] Disable latency metrics by default
     new e59c883  [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.
     new bfbcd90  [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.
     new 484dd31  [hotfix][tests] Fix checkstyle violations in JobMasterTest.
     new 1a5dea6  [hotfix][tests] Make JobMasterTest#EMPTY_TESTING_INPUT_SPLITS private.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  3 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 70 ++++++++++++++++++++--
 3 files changed, 66 insertions(+), 9 deletions(-)


[flink] 03/04: [hotfix][tests] Fix checkstyle violations in JobMasterTest.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 484dd31a785598a8fe20d302aac52f83c7cf8c24
Author: gyao <ga...@data-artisans.com>
AuthorDate: Fri Aug 31 08:19:10 2018 +0200

    [hotfix][tests] Fix checkstyle violations in JobMasterTest.
---
 .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d70cc31..30db715 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -424,7 +424,7 @@ public class JobMasterTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that an existing checkpoint will have precedence over an savepoint
+	 * Tests that an existing checkpoint will have precedence over an savepoint.
 	 */
 	@Test
 	public void testCheckpointPrecedesSavepointRecovery() throws Exception {
@@ -476,7 +476,7 @@ public class JobMasterTest extends TestLogger {
 
 	/**
 	 * Tests that the JobMaster retries the scheduling of a job
-	 * in case of a missing slot offering from a registered TaskExecutor
+	 * in case of a missing slot offering from a registered TaskExecutor.
 	 */
 	@Test
 	public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
@@ -880,9 +880,9 @@ public class JobMasterTest extends TestLogger {
 			final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>();
 			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
 				.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-					  tddFuture.complete(taskDeploymentDescriptor);
-					  return CompletableFuture.completedFuture(Acknowledge.get());
-				  })
+					tddFuture.complete(taskDeploymentDescriptor);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
 				.createTestingTaskExecutorGateway();
 			rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
 


[flink] 04/04: [hotfix][tests] Make JobMasterTest#EMPTY_TESTING_INPUT_SPLITS private.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a5dea611c15083b4acfe2c937fffbdc00c620a3
Author: gyao <ga...@data-artisans.com>
AuthorDate: Fri Aug 31 08:19:58 2018 +0200

    [hotfix][tests] Make JobMasterTest#EMPTY_TESTING_INPUT_SPLITS private.
---
 .../src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 30db715..9a2bc97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -145,7 +145,7 @@ import static org.junit.Assert.fail;
  */
 public class JobMasterTest extends TestLogger {
 
-	static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
+	private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
 
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();


[flink] 01/04: [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e59c883b2bd01eb5eb6b9c1db05006ec737bfd4f
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:27:54 2018 +0200

    [hotfix] Remove @RpcTimeout from JobMaster.requestJobDetails.
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 01cb2b6..4a66d32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -95,7 +95,6 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPre
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
@@ -909,7 +908,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
 		final ExecutionGraph currentExecutionGraph = executionGraph;
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
 	}


[flink] 02/04: [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfbcd908bf9b59f64ddf8c783293addc098ee516
Author: gyao <ga...@data-artisans.com>
AuthorDate: Wed Aug 22 13:29:14 2018 +0200

    [FLINK-10193][runtime] Add @RpcTimeout to JobMasterGateway.triggerSavepoint.
    
    This closes #6601.
---
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 58 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d..bc073c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ public interface JobMasterGateway extends
 	CompletableFuture<String> triggerSavepoint(
 		@Nullable final String targetDirectory,
 		final boolean cancelJob,
-		final Time timeout);
+		@RpcTimeout final Time timeout);
 
 	/**
 	 * Requests the statistics on operator back pressure.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 66ca769..d70cc31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -92,6 +92,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -108,6 +109,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -122,17 +124,21 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link JobMaster}.
@@ -917,6 +923,58 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+	 * is respected.
+	 */
+	@Test
+	public void testTriggerSavepointTimeout() throws Exception {
+		final JobMaster jobMaster = new JobMaster(
+			rpcService,
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jmResourceId,
+			jobGraph,
+			haServices,
+			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices,
+			blobServer,
+			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			new NoOpOnCompletionActions(),
+			testingFatalErrorHandler,
+			JobMasterTest.class.getClassLoader()) {
+
+			@Override
+			public CompletableFuture<String> triggerSavepoint(
+					@Nullable final String targetDirectory,
+					final boolean cancelJob,
+					final Time timeout) {
+				return new CompletableFuture<>();
+			}
+		};
+
+		try {
+			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+			final CompletableFuture<String> savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds(1));
+			final CompletableFuture<String> savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
+
+			try {
+				savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
+				fail();
+			} catch (final ExecutionException e) {
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+			}
+
+			assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private JobGraph producerConsumerJobGraph() {
 		final JobVertex producer = new JobVertex("Producer");
 		producer.setInvokableClass(NoOpInvokable.class);