You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:24 UTC

[05/52] [abbrv] flink git commit: [hotfix] [cluster management] Remove scala dependencies from MiniCluster.java

[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaf80a23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaf80a23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaf80a23

Branch: refs/heads/master
Commit: aaf80a23e0cf47a4e330eb4f973f21a0524b78b6
Parents: af924b4
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 14:41:52 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java     | 12 ++++--------
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 17 +++++++++++++++++
 .../runtime/minicluster/MiniClusterITCase.java     |  2 +-
 3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaf80a23/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ee38e0..d85234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.ExceptionUtils;
 
-import scala.Option;
-import scala.Tuple2;
-
 import javax.annotation.concurrent.GuardedBy;
 
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -348,7 +345,7 @@ public class MiniCluster {
 	/**
 	 * Factory method to instantiate the RPC service.
 	 * 
-	 * @param config
+	 * @param configuration
 	 *            The configuration of the mini cluster
 	 * @param askTimeout
 	 *            The default RPC timeout for asynchronous "ask" requests.
@@ -360,17 +357,16 @@ public class MiniCluster {
 	 * @return The instantiated RPC service
 	 */
 	protected RpcService createRpcService(
-			Configuration config,
+			Configuration configuration,
 			Time askTimeout,
 			boolean remoteEnabled,
 			String bindAddress) {
 
 		ActorSystem actorSystem;
 		if (remoteEnabled) {
-			Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
-			actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+			actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0);
 		} else {
-			actorSystem = AkkaUtils.createLocalActorSystem(config);
+			actorSystem = AkkaUtils.createLocalActorSystem(configuration);
 		}
 
 		return new AkkaRpcService(actorSystem, askTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaf80a23/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5210427..b25e29e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -58,6 +58,23 @@ object AkkaUtils {
   }
 
   /**
+    * Creates an actor system bound to the given hostname and port.
+    *
+    * @param configuration instance containing the user provided configuration values
+    * @param hostname of the network interface to bind to
+    * @param port of to bind to
+    * @return created actor system
+    */
+  def createActorSystem(
+      configuration: Configuration,
+      hostname: String,
+      port: Int)
+    : ActorSystem = {
+
+    createActorSystem(configuration, Some((hostname, port)))
+  }
+
+  /**
    * Creates an actor system. If a listening address is specified, then the actor system will listen
    * on that address for messages from a remote actor system. If not, then a local actor system
    * will be instantiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/aaf80a23/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..ef53547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
-//	@Test
+	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();