You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:57 UTC
[38/52] [abbrv] flink git commit: [FLINK-5170] [akka] Extend
AkkaUtils.getAkkaConfig methods to properly work with Java
[FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1da79241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1da79241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1da79241
Branch: refs/heads/master
Commit: 1da7924187c7dc9dc003a77ad0b2752a24e42cb3
Parents: 9ad9fa9
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 12:10:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/rpc/RpcServiceUtils.java | 10 +++++++++-
.../apache/flink/runtime/akka/AkkaUtils.scala | 20 +++++++++++++++++++-
2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1da79241/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index d40e336..1ac54ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -50,7 +50,15 @@ public class RpcServiceUtils {
final ActorSystem actorSystem;
try {
- Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+ Config akkaConfig;
+
+ if (hostname != null && !hostname.isEmpty()) {
+ // remote akka config
+ akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+ } else {
+ // local akka config
+ akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+ }
LOG.debug("Using akka configuration \n {}.", akkaConfig);
http://git-wip-us.apache.org/repos/asf/flink/blob/1da79241/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 4fe39b6..d94eb7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -114,8 +114,26 @@ object AkkaUtils {
createActorSystem(getDefaultAkkaConfig)
}
+ /**
+ * Return a remote Akka config for the given configuration values.
+ *
+ * @param configuration containing the user provided configuration values
+ * @param hostname to bind against. If null, then the loopback interface is used
+ * @param port to bind against
+ * @return A remote Akka config
+ */
def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
- getAkkaConfig(configuration, if (hostname != null) Some((hostname, port)) else None)
+ getAkkaConfig(configuration, Some((hostname, port)))
+ }
+
+ /**
+ * Return a local Akka config for the given configuration values.
+ *
+ * @param configuration containing the user provided configuration values
+ * @return A local Akka config
+ */
+ def getAkkaConfig(configuration: Configuration): Config = {
+ getAkkaConfig(configuration, None)
}
/**