You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:24 UTC
[05/52] [abbrv] flink git commit: [hotfix] [cluster management]
Remove scala dependencies from MiniCluster.java
[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaf80a23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaf80a23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaf80a23
Branch: refs/heads/master
Commit: aaf80a23e0cf47a4e330eb4f973f21a0524b78b6
Parents: af924b4
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 14:41:52 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 12 ++++--------
.../org/apache/flink/runtime/akka/AkkaUtils.scala | 17 +++++++++++++++++
.../runtime/minicluster/MiniClusterITCase.java | 2 +-
3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aaf80a23/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ee38e0..d85234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.ExceptionUtils;
-import scala.Option;
-import scala.Tuple2;
-
import javax.annotation.concurrent.GuardedBy;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -348,7 +345,7 @@ public class MiniCluster {
/**
* Factory method to instantiate the RPC service.
*
- * @param config
+ * @param configuration
* The configuration of the mini cluster
* @param askTimeout
* The default RPC timeout for asynchronous "ask" requests.
@@ -360,17 +357,16 @@ public class MiniCluster {
* @return The instantiated RPC service
*/
protected RpcService createRpcService(
- Configuration config,
+ Configuration configuration,
Time askTimeout,
boolean remoteEnabled,
String bindAddress) {
ActorSystem actorSystem;
if (remoteEnabled) {
- Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
- actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+ actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0);
} else {
- actorSystem = AkkaUtils.createLocalActorSystem(config);
+ actorSystem = AkkaUtils.createLocalActorSystem(configuration);
}
return new AkkaRpcService(actorSystem, askTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/aaf80a23/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5210427..b25e29e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -58,6 +58,23 @@ object AkkaUtils {
}
/**
+ * Creates an actor system bound to the given hostname and port.
+ *
+ * @param configuration instance containing the user provided configuration values
+ * @param hostname of the network interface to bind to
+ * @param port of to bind to
+ * @return created actor system
+ */
+ def createActorSystem(
+ configuration: Configuration,
+ hostname: String,
+ port: Int)
+ : ActorSystem = {
+
+ createActorSystem(configuration, Some((hostname, port)))
+ }
+
+ /**
* Creates an actor system. If a listening address is specified, then the actor system will listen
* on that address for messages from a remote actor system. If not, then a local actor system
* will be instantiated.
http://git-wip-us.apache.org/repos/asf/flink/blob/aaf80a23/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..ef53547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
*/
public class MiniClusterITCase extends TestLogger {
-// @Test
+ @Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration();