You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/30 16:58:40 UTC
[3/3] flink git commit: [hotfix] Pass in Rest address to Dispatcher
as nullable String
[hotfix] Pass in Rest address to Dispatcher as nullable String
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0e82dca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0e82dca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0e82dca
Branch: refs/heads/master
Commit: f0e82dca3b81f75718c8cdabbb586595170a4f1a
Parents: dcbc966
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 14:59:28 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 30 17:57:36 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 18 +++++++++++-------
.../runtime/dispatcher/StandaloneDispatcher.java | 4 ++--
.../entrypoint/SessionClusterEntrypoint.java | 7 ++++---
.../flink/runtime/dispatcher/DispatcherTest.java | 3 +--
4 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b22e7ab..1fa0f7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -56,13 +56,14 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -93,7 +94,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private final LeaderElectionService leaderElectionService;
- private final CompletableFuture<String> restAddressFuture;
+ @Nullable
+ protected final String restAddress;
protected Dispatcher(
RpcService rpcService,
@@ -105,7 +107,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
- Optional<String> restAddress) throws Exception {
+ @Nullable String restAddress) throws Exception {
super(rpcService, endpointId);
this.configuration = Preconditions.checkNotNull(configuration);
@@ -124,10 +126,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
jobManagerRunners = new HashMap<>(16);
leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
- this.restAddressFuture = restAddress
- .map(CompletableFuture::completedFuture)
- .orElse(FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint.")));
+ this.restAddress = restAddress;
}
//------------------------------------------------------
@@ -275,7 +275,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
@Override
public CompletableFuture<String> requestRestAddress(Time timeout) {
- return restAddressFuture;
+ if (restAddress != null) {
+ return CompletableFuture.completedFuture(restAddress);
+ } else {
+ return FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint."));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5a6889e..3ba681c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
-import java.util.Optional;
+import javax.annotation.Nullable;
/**
* Dispatcher implementation which spawns a {@link JobMaster} for each
@@ -51,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
- Optional<String> restAddress) throws Exception {
+ @Nullable String restAddress) throws Exception {
super(
rpcService,
endpointId,
http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 3feb005..27ddf49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -49,7 +49,8 @@ import org.apache.flink.util.FlinkException;
import akka.actor.ActorSystem;
-import java.util.Optional;
+import javax.annotation.Nullable;
+
import java.util.concurrent.Executor;
/**
@@ -130,7 +131,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
heartbeatServices,
metricRegistry,
this,
- Optional.of(dispatcherRestEndpoint.getRestAddress()));
+ dispatcherRestEndpoint.getRestAddress());
LOG.debug("Starting ResourceManager.");
resourceManager.start();
@@ -214,7 +215,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
- Optional<String> restAddress) throws Exception {
+ @Nullable String restAddress) throws Exception {
// create the default dispatcher
return new StandaloneDispatcher(
http://git-wip-us.apache.org/repos/asf/flink/blob/f0e82dca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a511d45..d5b63d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -51,7 +51,6 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -224,7 +223,7 @@ public class DispatcherTest extends TestLogger {
heartbeatServices,
metricRegistry,
fatalErrorHandler,
- Optional.empty());
+ null);
this.jobManagerRunner = jobManagerRunner;
this.expectedJobId = expectedJobId;