You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/22 15:45:40 UTC

[7/9] flink git commit: [hotfix] Make ActorSystemLoader in ClusterClient configurable

[hotfix] Make ActorSystemLoader in ClusterClient configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b946c5e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b946c5e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b946c5e5

Branch: refs/heads/master
Commit: b946c5e569eeabf90666bbda9c458bca71029a7f
Parents: e64d4db
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 21 15:24:40 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 22 17:45:04 2018 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 48 +++++++++++++++++---
 .../client/program/StandaloneClusterClient.java |  4 ++
 .../apache/flink/yarn/YarnClusterClient.java    |  4 +-
 3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c082b10..a157d34 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -104,8 +104,8 @@ public abstract class ClusterClient<T> {
 	/** The optimizer used in the optimization of batch programs. */
 	final Optimizer compiler;
 
-	/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
-	protected final LazyActorSystemLoader actorSystemLoader;
+	/** The actor system used to communicate with the JobManager. */
+	protected final ActorSystemLoader actorSystemLoader;
 
 	/** Configuration of the client. */
 	protected final Configuration flinkConfig;
@@ -171,7 +171,10 @@ public abstract class ClusterClient<T> {
 	 * @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
 	 * @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down
 	 */
-	public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) {
+	public ClusterClient(
+			Configuration flinkConfig,
+			HighAvailabilityServices highAvailabilityServices,
+			boolean sharedHaServices) {
 		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
 
@@ -188,14 +191,45 @@ public abstract class ClusterClient<T> {
 		this.sharedHaServices = sharedHaServices;
 	}
 
+	public ClusterClient(
+			Configuration flinkConfig,
+			HighAvailabilityServices highAvailabilityServices,
+			boolean sharedHaServices,
+			ActorSystemLoader actorSystemLoader) {
+		this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
+
+		this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
+		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
+
+		this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader);
+
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+		this.sharedHaServices = sharedHaServices;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Startup & Shutdown
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Interface which allows to load an {@link ActorSystem}.
+	 */
+	public interface ActorSystemLoader extends AutoCloseable {
+
+		/**
+		 * Get an {@link ActorSystem}.
+		 *
+		 * @return {@link ActorSystem}
+		 * @throws FlinkException
+		 */
+		ActorSystem get() throws FlinkException;
+	}
+
+	/**
 	 * Utility class to lazily instantiate an {@link ActorSystem}.
 	 */
-	protected static class LazyActorSystemLoader {
+	protected static class LazyActorSystemLoader implements ActorSystemLoader {
 
 		private final Logger log;
 
@@ -226,7 +260,8 @@ public abstract class ClusterClient<T> {
 			return actorSystem != null;
 		}
 
-		public void shutdown() {
+		@Override
+		public void close() throws Exception {
 			if (isLoaded()) {
 				actorSystem.shutdown();
 				actorSystem.awaitTermination();
@@ -239,6 +274,7 @@ public abstract class ClusterClient<T> {
 		 * @return ActorSystem
 		 * @throws Exception if the ActorSystem could not be created
 		 */
+		@Override
 		public ActorSystem get() throws FlinkException {
 
 			if (!isLoaded()) {
@@ -276,7 +312,7 @@ public abstract class ClusterClient<T> {
 	 */
 	public void shutdown() throws Exception {
 		synchronized (this) {
-			actorSystemLoader.shutdown();
+			actorSystemLoader.close();
 
 			if (!sharedHaServices && highAvailabilityServices != null) {
 				highAvailabilityServices.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index e502add..caee34f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -51,6 +51,10 @@ public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId>
 		super(config, highAvailabilityServices, sharedHaServices);
 	}
 
+	public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices, ActorSystemLoader actorSystemLoader) {
+		super(config, highAvailabilityServices, sharedHaServices, actorSystemLoader);
+	}
+
 	@Override
 	public void waitForClusterToBeReady() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b946c5e5/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 0d7546e..a5aca5d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -285,14 +285,14 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 	private static class LazApplicationClientLoader {
 
 		private final Configuration flinkConfig;
-		private final LazyActorSystemLoader actorSystemLoader;
+		private final ActorSystemLoader actorSystemLoader;
 		private final HighAvailabilityServices highAvailabilityServices;
 
 		private ActorRef applicationClient;
 
 		private LazApplicationClientLoader(
 				Configuration flinkConfig,
-				LazyActorSystemLoader actorSystemLoader,
+				ActorSystemLoader actorSystemLoader,
 				HighAvailabilityServices highAvailabilityServices) {
 			this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");
 			this.actorSystemLoader = Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");