You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:28 UTC
[48/50] [abbrv] flink git commit: [hotfix] [cluster management]
Remove scala dependencies from MiniCluster.java
[hotfix] [cluster management] Remove scala dependencies from MiniCluster.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ba580bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ba580bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ba580bc
Branch: refs/heads/flip-6
Commit: 7ba580bc07ff5b1d2ae9be1a738f51ffdf6b03db
Parents: dbf4b99
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 14:41:52 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:50:36 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 12 ++++--------
.../org/apache/flink/runtime/akka/AkkaUtils.scala | 17 +++++++++++++++++
.../runtime/minicluster/MiniClusterITCase.java | 2 +-
3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7ba580bc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ee38e0..d85234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.ExceptionUtils;
-import scala.Option;
-import scala.Tuple2;
-
import javax.annotation.concurrent.GuardedBy;
import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -348,7 +345,7 @@ public class MiniCluster {
/**
* Factory method to instantiate the RPC service.
*
- * @param config
+ * @param configuration
* The configuration of the mini cluster
* @param askTimeout
* The default RPC timeout for asynchronous "ask" requests.
@@ -360,17 +357,16 @@ public class MiniCluster {
* @return The instantiated RPC service
*/
protected RpcService createRpcService(
- Configuration config,
+ Configuration configuration,
Time askTimeout,
boolean remoteEnabled,
String bindAddress) {
ActorSystem actorSystem;
if (remoteEnabled) {
- Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
- actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+ actorSystem = AkkaUtils.createActorSystem(configuration, bindAddress, 0);
} else {
- actorSystem = AkkaUtils.createLocalActorSystem(config);
+ actorSystem = AkkaUtils.createLocalActorSystem(configuration);
}
return new AkkaRpcService(actorSystem, askTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/7ba580bc/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 9463bfe..45c7418 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -56,6 +56,23 @@ object AkkaUtils {
}
/**
+ * Creates an actor system bound to the given hostname and port.
+ *
+ * @param configuration instance containing the user provided configuration values
+ * @param hostname of the network interface to bind to
+ * @param port of to bind to
+ * @return created actor system
+ */
+ def createActorSystem(
+ configuration: Configuration,
+ hostname: String,
+ port: Int)
+ : ActorSystem = {
+
+ createActorSystem(configuration, Some((hostname, port)))
+ }
+
+ /**
* Creates an actor system. If a listening address is specified, then the actor system will listen
* on that address for messages from a remote actor system. If not, then a local actor system
* will be instantiated.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ba580bc/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index dd43337..ef53547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -31,7 +31,7 @@ import org.junit.Test;
*/
public class MiniClusterITCase extends TestLogger {
-// @Test
+ @Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration();