You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:40 UTC
[7/9] flink git commit: [hotfix] Make ActorSystemLoader in
ClusterClient configurable
[hotfix] Make ActorSystemLoader in ClusterClient configurable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b946c5e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b946c5e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b946c5e5
Branch: refs/heads/master
Commit: b946c5e569eeabf90666bbda9c458bca71029a7f
Parents: e64d4db
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:24:40 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClient.java | 48 +++++++++++++++++---
.../client/program/StandaloneClusterClient.java | 4 ++
.../apache/flink/yarn/YarnClusterClient.java | 4 +-
3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c082b10..a157d34 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -104,8 +104,8 @@ public abstract class ClusterClient<T> {
/** The optimizer used in the optimization of batch programs. */
final Optimizer compiler;
- /** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
- protected final LazyActorSystemLoader actorSystemLoader;
+ /** The actor system used to communicate with the JobManager. */
+ protected final ActorSystemLoader actorSystemLoader;
/** Configuration of the client. */
protected final Configuration flinkConfig;
@@ -171,7 +171,10 @@ public abstract class ClusterClient<T> {
* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
* @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down
*/
- public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) {
+ public ClusterClient(
+ Configuration flinkConfig,
+ HighAvailabilityServices highAvailabilityServices,
+ boolean sharedHaServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
@@ -188,14 +191,45 @@ public abstract class ClusterClient<T> {
this.sharedHaServices = sharedHaServices;
}
+ public ClusterClient(
+ Configuration flinkConfig,
+ HighAvailabilityServices highAvailabilityServices,
+ boolean sharedHaServices,
+ ActorSystemLoader actorSystemLoader) {
+ this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+ this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
+
+ this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
+ this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
+
+ this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader);
+
+ this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+ this.sharedHaServices = sharedHaServices;
+ }
+
// ------------------------------------------------------------------------
// Startup & Shutdown
// ------------------------------------------------------------------------
/**
+ * Interface which allows to load an {@link ActorSystem}.
+ */
+ public interface ActorSystemLoader extends AutoCloseable {
+
+ /**
+ * Get an {@link ActorSystem}.
+ *
+ * @return {@link ActorSystem}
+ * @throws FlinkException
+ */
+ ActorSystem get() throws FlinkException;
+ }
+
+ /**
* Utility class to lazily instantiate an {@link ActorSystem}.
*/
- protected static class LazyActorSystemLoader {
+ protected static class LazyActorSystemLoader implements ActorSystemLoader {
private final Logger log;
@@ -226,7 +260,8 @@ public abstract class ClusterClient<T> {
return actorSystem != null;
}
- public void shutdown() {
+ @Override
+ public void close() throws Exception {
if (isLoaded()) {
actorSystem.shutdown();
actorSystem.awaitTermination();
@@ -239,6 +274,7 @@ public abstract class ClusterClient<T> {
* @return ActorSystem
* @throws Exception if the ActorSystem could not be created
*/
+ @Override
public ActorSystem get() throws FlinkException {
if (!isLoaded()) {
@@ -276,7 +312,7 @@ public abstract class ClusterClient<T> {
*/
public void shutdown() throws Exception {
synchronized (this) {
- actorSystemLoader.shutdown();
+ actorSystemLoader.close();
if (!sharedHaServices && highAvailabilityServices != null) {
highAvailabilityServices.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index e502add..caee34f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -51,6 +51,10 @@ public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId>
super(config, highAvailabilityServices, sharedHaServices);
}
+ public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices, ActorSystemLoader actorSystemLoader) {
+ super(config, highAvailabilityServices, sharedHaServices, actorSystemLoader);
+ }
+
@Override
public void waitForClusterToBeReady() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 0d7546e..a5aca5d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -285,14 +285,14 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
private static class LazApplicationClientLoader {
private final Configuration flinkConfig;
- private final LazyActorSystemLoader actorSystemLoader;
+ private final ActorSystemLoader actorSystemLoader;
private final HighAvailabilityServices highAvailabilityServices;
private ActorRef applicationClient;
private LazApplicationClientLoader(
Configuration flinkConfig,
- LazyActorSystemLoader actorSystemLoader,
+ ActorSystemLoader actorSystemLoader,
HighAvailabilityServices highAvailabilityServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");
this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");