You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/28 16:22:16 UTC

[4/6] flink git commit: [FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java

[FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f4acec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f4acec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f4acec

Branch: refs/heads/flip-6
Commit: 53f4acec84a8a7a9ab7c623dfec9ca7b48d7251e
Parents: 8265b54
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 12:10:43 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:12:56 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/RpcServiceUtils.java      | 10 +++++++++-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 20 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53f4acec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index d40e336..1ac54ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -50,7 +50,15 @@ public class RpcServiceUtils {
 		final ActorSystem actorSystem;
 
 		try {
-			Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			Config akkaConfig;
+
+			if (hostname != null && !hostname.isEmpty()) {
+				// remote akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			} else {
+				// local akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+			}
 
 			LOG.debug("Using akka configuration \n {}.", akkaConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f4acec/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index ddb9fc3..5a620f3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -114,8 +114,26 @@ object AkkaUtils {
     createActorSystem(getDefaultAkkaConfig)
   }
 
+  /**
+    * Return a remote Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @param hostname to bind against. If null, then the loopback interface is used
+    * @param port to bind against
+    * @return A remote Akka config
+    */
   def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
-    getAkkaConfig(configuration, if (hostname != null) Some((hostname, port)) else None)
+    getAkkaConfig(configuration, Some((hostname, port)))
+  }
+
+  /**
+    * Return a local Akka config for the given configuration values.
+    *
+    * @param configuration containing the user provided configuration values
+    * @return A local Akka config
+    */
+  def getAkkaConfig(configuration: Configuration): Config = {
+    getAkkaConfig(configuration, None)
   }
 
   /**