You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/28 16:22:16 UTC
[4/6] flink git commit: [FLINK-5170] [akka] Extend
AkkaUtils.getAkkaConfig methods to properly work with Java
[FLINK-5170] [akka] Extend AkkaUtils.getAkkaConfig methods to properly work with Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f4acec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f4acec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f4acec
Branch: refs/heads/flip-6
Commit: 53f4acec84a8a7a9ab7c623dfec9ca7b48d7251e
Parents: 8265b54
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 12:10:43 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 28 17:12:56 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/rpc/RpcServiceUtils.java | 10 +++++++++-
.../apache/flink/runtime/akka/AkkaUtils.scala | 20 +++++++++++++++++++-
2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53f4acec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index d40e336..1ac54ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -50,7 +50,15 @@ public class RpcServiceUtils {
final ActorSystem actorSystem;
try {
- Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+ Config akkaConfig;
+
+ if (hostname != null && !hostname.isEmpty()) {
+ // remote akka config
+ akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+ } else {
+ // local akka config
+ akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+ }
LOG.debug("Using akka configuration \n {}.", akkaConfig);
http://git-wip-us.apache.org/repos/asf/flink/blob/53f4acec/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index ddb9fc3..5a620f3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -114,8 +114,26 @@ object AkkaUtils {
createActorSystem(getDefaultAkkaConfig)
}
+ /**
+ * Return a remote Akka config for the given configuration values.
+ *
+ * @param configuration containing the user provided configuration values
+ * @param hostname to bind against. If null, then the loopback interface is used
+ * @param port to bind against
+ * @return A remote Akka config
+ */
def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
- getAkkaConfig(configuration, if (hostname != null) Some((hostname, port)) else None)
+ getAkkaConfig(configuration, Some((hostname, port)))
+ }
+
+ /**
+ * Return a local Akka config for the given configuration values.
+ *
+ * @param configuration containing the user provided configuration values
+ * @return A local Akka config
+ */
+ def getAkkaConfig(configuration: Configuration): Config = {
+ getAkkaConfig(configuration, None)
}
/**