You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:57 UTC

[38/52] [abbrv] flink git commit: [FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java

[FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1da79241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1da79241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1da79241

Branch: refs/heads/master
Commit: 1da7924187c7dc9dc003a77ad0b2752a24e42cb3
Parents: 9ad9fa9
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 12:10:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/RpcServiceUtils.java      | 10 +++++++++-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 20 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1da79241/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index d40e336..1ac54ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -50,7 +50,15 @@ public class RpcServiceUtils {
 		final ActorSystem actorSystem;
 
 		try {
-			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			Config akkaConfig;
+
+			if (hostname != null && !hostname.isEmpty()) {
+				// remote akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			} else {
+				// local akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+			}
 
 			LOG.debug("Using akka configuration \n {}.", akkaConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1da79241/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 4fe39b6..d94eb7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -114,8 +114,26 @@ object AkkaUtils {
     createActorSystem(getDefaultAkkaConfig)
   }
 
+  /**
+    * Return a remote Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @param hostname to bind against. If null, then the loopback interface is used
+    * @param port to bind against
+    * @return A remote Akka config
+    */
   def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
-    getAkkaConfig(configuration, if (hostname != null) Some((hostname, port)) else None)
+    getAkkaConfig(configuration, Some((hostname, port)))
+  }
+
+  /**
+    * Return a local Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @return A local Akka config
+    */
+  def getAkkaConfig(configuration: Configuration): Config = {
+    getAkkaConfig(configuration, None)
   }
 
   /**