You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/30 16:58:40 UTC

[3/3] flink git commit: [hotfix] Pass in Rest address to Dispatcher as nullable String

[hotfix] Pass in Rest address to Dispatcher as nullable String


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0e82dca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0e82dca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0e82dca

Branch: refs/heads/master
Commit: f0e82dca3b81f75718c8cdabbb586595170a4f1a
Parents: dcbc966
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 14:59:28 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 30 17:57:36 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java      | 18 +++++++++++-------
 .../runtime/dispatcher/StandaloneDispatcher.java  |  4 ++--
 .../entrypoint/SessionClusterEntrypoint.java      |  7 ++++---
 .../flink/runtime/dispatcher/DispatcherTest.java  |  3 +--
 4 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b22e7ab..1fa0f7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -56,13 +56,14 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
@@ -93,7 +94,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final LeaderElectionService leaderElectionService;
 
-	private final CompletableFuture<String> restAddressFuture;
+	@Nullable
+	protected final String restAddress;
 
 	protected Dispatcher(
 			RpcService rpcService,
@@ -105,7 +107,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
-			Optional<String> restAddress) throws Exception {
+			@Nullable String restAddress) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -124,10 +126,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		jobManagerRunners = new HashMap<>(16);
 
 		leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
-		this.restAddressFuture = restAddress
-			.map(CompletableFuture::completedFuture)
-			.orElse(FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint.")));
 
+		this.restAddress = restAddress;
 	}
 
 	//------------------------------------------------------
@@ -275,7 +275,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<String> requestRestAddress(Time timeout) {
-		return restAddressFuture;
+		if (restAddress != null) {
+			return CompletableFuture.completedFuture(restAddress);
+		} else {
+			return FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint."));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5a6889e..3ba681c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
-import java.util.Optional;
+import javax.annotation.Nullable;
 
 /**
  * Dispatcher implementation which spawns a {@link JobMaster} for each
@@ -51,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
-			Optional<String> restAddress) throws Exception {
+			@Nullable String restAddress) throws Exception {
 		super(
 			rpcService,
 			endpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 3feb005..27ddf49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -49,7 +49,8 @@ import org.apache.flink.util.FlinkException;
 
 import akka.actor.ActorSystem;
 
-import java.util.Optional;
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Executor;
 
 /**
@@ -130,7 +131,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			heartbeatServices,
 			metricRegistry,
 			this,
-			Optional.of(dispatcherRestEndpoint.getRestAddress()));
+			dispatcherRestEndpoint.getRestAddress());
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
@@ -214,7 +215,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
 		FatalErrorHandler fatalErrorHandler,
-		Optional<String> restAddress) throws Exception {
+		@Nullable String restAddress) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a511d45..d5b63d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -51,7 +51,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -224,7 +223,7 @@ public class DispatcherTest extends TestLogger {
 				heartbeatServices,
 				metricRegistry,
 				fatalErrorHandler,
-				Optional.empty());
+				null);
 
 			this.jobManagerRunner = jobManagerRunner;
 			this.expectedJobId = expectedJobId;