You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:59 UTC

[63/82] [abbrv] incubator-flink git commit: Add option to use single actor system for local execution. Use local connection manager if a single task manager is used for local execution. Remove synchronized blcok in getReceiverList of ChannelManager which

Add option to use single actor system for local execution. Use local connection manager if a single task manager is used for local execution. Remove synchronized blcok in getReceiverList of ChannelManager which effectively serialized the connection lookup calls of a single task manager.

Fix Java6 problem that File has no method toPath


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c93d9eaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c93d9eaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c93d9eaf

Branch: refs/heads/master
Commit: c93d9eaf363a535dff25cc4e7db400d879e73bb1
Parents: a0f7785
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 16 11:33:43 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:32 2014 +0100

----------------------------------------------------------------------
 .gitignore                                      |  1 -
 .../api/avro/AvroExternalJarProgramITCase.java  |  2 +-
 .../flink/streaming/util/ClusterUtil.java       |  2 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  3 +-
 .../scala/org/apache/flink/yarn/YarnUtils.scala | 33 ++++++--
 .../org/apache/flink/client/LocalExecutor.java  |  2 +-
 .../runtime/io/network/ChannelManager.java      | 12 +--
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 80 ++++++++++++++++----
 .../apache/flink/runtime/client/JobClient.scala |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  6 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  | 38 ++++++++--
 .../minicluster/LocalFlinkMiniCluster.scala     | 26 +++++--
 .../flink/runtime/taskmanager/TaskManager.scala |  7 +-
 .../runtime/testingUtils/TestingCluster.scala   |  4 +-
 .../runtime/testingUtils/TestingUtils.scala     | 10 +--
 .../flink/test/util/AbstractTestBase.java       |  2 +-
 .../test/util/MultipleProgramsTestBase.java     |  1 -
 .../apache/flink/test/util/TestBaseUtils.java   |  3 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  8 +-
 .../PackagedProgramEndToEndITCase.java          |  2 +-
 .../scala/runtime/ScalaSpecialTypesITCase.scala | 12 +--
 pom.xml                                         |  1 -
 22 files changed, 181 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0afd000..b519d40 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,3 @@ tmp
 .DS_Store
 _site
 docs/api
-atlassian-ide-plugin.xml

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 78a6683..0b394e3 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -44,7 +44,7 @@ public class AvroExternalJarProgramITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-			testMiniCluster = new ForkableFlinkMiniCluster(config);
+			testMiniCluster = new ForkableFlinkMiniCluster(config, false);
 			
 			String jarFile = JAR_FILE;
 			String testData = getClass().getResource(TEST_DATA_FILE).toString();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index f75db68..8fb6554 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -57,7 +57,7 @@ public class ClusterUtil {
 		}
 
 		try {
-			exec = new LocalFlinkMiniCluster(configuration);
+			exec = new LocalFlinkMiniCluster(configuration, true);
 
 			Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
 					configuration, ClusterUtil.class.getClassLoader());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 58ce6cf..4a6e8cb 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -85,7 +85,8 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
 
           writeYarnProperties(address)
 
-          jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout))
+          jobManager = Some(AkkaUtils.getReference(JobManager.getRemoteAkkaURL(address))(system,
+            timeout))
           jobManager.get ! RegisterMessageListener
 
           pollingTimer foreach {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
index d71273a..86b06e1 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -32,7 +32,7 @@ object YarnUtils {
     AkkaUtils.createActorSystem(akkaConfig)
   }
 
-  def createActorSystem: ActorSystem = {
+  def createActorSystem(): ActorSystem = {
     val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
       getConfigString)
 
@@ -40,12 +40,31 @@ object YarnUtils {
   }
 
   def getConfigString: String = {
-    s"""akka.loglevel = "INFO"
-      |akka.stdout-loglevel = "INFO"
-      |akka.log-dead-letters-during-shutdown = off
-      |akka.log-dead-letters = off
-      |akka.remote.log-remote-lifecycle-events=off
-      |""".stripMargin
+    """
+    |akka{
+    |  loglevel = "INFO"
+    |  stdout-loglevel = "INFO"
+    |  log-dead-letters-during-shutdown = off
+    |  log-dead-letters = off
+    |
+    |  actor {
+    |    provider = "akka.remote.RemoteActorRefProvider"
+    |  }
+    |
+    |  remote{
+    |    log-remote-lifecycle-events = off
+    |
+    |    netty{
+    |      tcp{
+    |        port = 0
+    |        transport-class = "akka.remote.transport.netty.NettyTransport"
+    |        tcp-nodelay = on
+    |        maximum-frame-size = 1MB
+    |        execution-pool-size = 4
+    |      }
+    |    }
+    |  }
+    |}""".stripMargin
   }
 
   def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 55fda89..6691cdf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -100,7 +100,7 @@ public class LocalExecutor extends PlanExecutor {
 				configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
 				configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
 				// start it up
-				this.flink = new LocalFlinkMiniCluster(configuration);
+				this.flink = new LocalFlinkMiniCluster(configuration, true);
 			} else {
 				throw new IllegalStateException("The local executor was already started.");
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index cd07cc3..c841872 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -380,15 +380,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 
 		while (true) {
 			ConnectionInfoLookupResponse lookupResponse;
-			synchronized (this.channelLookup) {
-				try{
-					lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
-							new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
-									sourceChannelID), timeout).response();
-				}catch(IOException ioe) {
-					throw ioe;
-				}
-			}
+			lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
+					new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
+							sourceChannelID), timeout).response();
 
 			if (lookupResponse.receiverReady()) {
 				receiverList = new EnvelopeReceiverList(lookupResponse);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index d52dde2..229c229 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -44,6 +44,10 @@ object AkkaUtils {
     createActorSystem(getDefaultActorSystemConfig)
   }
 
+  def createLocalActorSystem(): ActorSystem = {
+    createActorSystem(getDefaultLocalActorSystemConfig)
+  }
+
   def createActorSystem(akkaConfig: Config): ActorSystem = {
     ActorSystem.create("flink", akkaConfig)
   }
@@ -133,20 +137,47 @@ object AkkaUtils {
     getDefaultActorSystemConfigString + configString
   }
 
+  def getLocalConfigString(configuration: Configuration): String = {
+    val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
+      ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
+    val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
+      ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+
+    val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
+
+    val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
+      ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
+
+    val configString =
+      s"""
+         |akka {
+         |  loglevel = $logLevel
+         |  stdout-loglevel = $logLevel
+         |
+         |  log-dead-letters = $logLifecycleEvents
+         |  log-dead-letters-during-shutdown = $logLifecycleEvents
+         |
+         |  actor{
+         |    default-dispatcher{
+         |      executor = "default-executor"
+         |
+         |      throughput = ${akkaThroughput}
+         |
+         |      fork-join-executor {
+         |        parallelism-factor = 2.0
+         |      }
+         |    }
+         |  }
+         |
+         |}
+       """.stripMargin
+
+    getDefaultLocalActorSystemConfigString + configString
+  }
+
   def getDefaultActorSystemConfigString: String = {
-    """
+    val config = """
        |akka {
-       |  daemonic = on
-       |
-       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
-       |  logger-startup-timeout = 30s
-       |  loglevel = "WARNING"
-       |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-       |  stdout-loglevel = "WARNING"
-       |  jvm-exit-on-fatal-error = off
-       |  log-config-on-start = on
-       |  serialize-messages = on
-       |
        |  actor {
        |    provider = "akka.remote.RemoteActorRefProvider"
        |  }
@@ -164,7 +195,26 @@ object AkkaUtils {
        |  }
        |}
      """.stripMargin
-    }
+
+    getDefaultLocalActorSystemConfigString + config
+  }
+
+  def getDefaultLocalActorSystemConfigString: String = {
+    """
+      |akka {
+      |  daemonic = on
+      |
+      |  loggers = ["akka.event.slf4j.Slf4jLogger"]
+      |  logger-startup-timeout = 30s
+      |  loglevel = "DEBUG"
+      |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+      |  stdout-loglevel = "DEBUG"
+      |  jvm-exit-on-fatal-error = off
+      |  log-config-on-start = off
+      |  serialize-messages = on
+      |}
+    """.stripMargin
+  }
 
   // scalastyle:off line.size.limit
 
@@ -347,6 +397,10 @@ object AkkaUtils {
     ConfigFactory.parseString(getDefaultActorSystemConfigString)
   }
 
+  def getDefaultLocalActorSystemConfig = {
+    ConfigFactory.parseString(getDefaultLocalActorSystemConfigString)
+  }
+
   def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout:
   FiniteDuration): ActorRef = {
     Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 53f0daa..9377069 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -107,7 +107,7 @@ object JobClient{
             "configuration.")
         }
 
-        JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
+        JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6dbcb67..8752bef 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -58,7 +58,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
 
   Execution.timeout = timeout;
 
-  log.info("Starting job manager.")
+  log.info(s"Starting job manager at ${self.path}.")
 
   val (archiveCount,
     profiling,
@@ -520,7 +520,7 @@ object JobManager {
     actorSystem.actorOf(props, JOB_MANAGER_NAME)
   }
 
-  def getAkkaURL(address: String): String = {
+  def getRemoteAkkaURL(address: String): String = {
     s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
   }
 
@@ -541,6 +541,6 @@ object JobManager {
 
   def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout:
   FiniteDuration): ActorRef = {
-    AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort))
+    AkkaUtils.getReference(getRemoteAkkaURL(address.getHostName + ":" + address.getPort))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bfb9ae2..1b7d452 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-import com.typesafe.config.{ConfigFactory, Config}
+import com.typesafe.config.{ConfigFactory}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
@@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Future, Await}
 
-abstract class FlinkMiniCluster(userConfiguration: Configuration) {
+abstract class FlinkMiniCluster(userConfiguration: Configuration,
+                                val singleActorSystem: Boolean) {
   import FlinkMiniCluster._
 
   val HOSTNAME = "localhost"
@@ -41,6 +42,10 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
 
   val configuration = generateConfiguration(userConfiguration)
 
+  if(singleActorSystem){
+    configuration.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
+  }
+
   val jobManagerActorSystem = startJobManagerActorSystem()
   val jobManagerActor = startJobManager(jobManagerActorSystem)
 
@@ -48,7 +53,13 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
     .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
 
   val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield {
-    val actorSystem = startTaskManagerActorSystem(i)
+    val actorSystem = if(singleActorSystem) {
+      jobManagerActorSystem
+    }
+    else{
+      startTaskManagerActorSystem(i)
+    }
+
     (actorSystem, startTaskManager(i)(actorSystem))
   }
 
@@ -66,7 +77,12 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
     val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
       .DEFAULT_JOB_MANAGER_IPC_PORT)
 
-    AkkaUtils.getConfigString(HOSTNAME, port, configuration)
+    if(singleActorSystem){
+      AkkaUtils.getLocalConfigString(configuration)
+    }else{
+      AkkaUtils.getConfigString(HOSTNAME, port, configuration)
+    }
+
   }
 
   def startJobManagerActorSystem(): ActorSystem = {
@@ -111,13 +127,23 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   }
 
   def shutdown(): Unit = {
-    taskManagerActorSystems foreach { _.shutdown() }
+    if(!singleActorSystem){
+      taskManagerActorSystems foreach {
+        _.shutdown()
+      }
+    }
+
     jobManagerActorSystem.shutdown()
   }
 
   def awaitTermination(): Unit = {
     jobManagerActorSystem.awaitTermination()
-    taskManagerActorSystems foreach { _.awaitTermination()}
+
+    if(!singleActorSystem) {
+      taskManagerActorSystems foreach {
+        _.awaitTermination()
+      }
+    }
   }
 
   def waitForTaskManagersToBeRegistered(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index c1169d3..67053b2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -28,11 +28,16 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
 
-class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
-FlinkMiniCluster(userConfiguration){
+class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
+  extends FlinkMiniCluster(userConfiguration, singleActorSystem){
   import LocalFlinkMiniCluster._
 
-  val jobClientActorSystem = AkkaUtils.createActorSystem()
+  val jobClientActorSystem = if(singleActorSystem){
+    jobManagerActorSystem
+  }else{
+    AkkaUtils.createActorSystem()
+  }
+
   var jobClient: Option[ActorRef] = None
 
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
@@ -70,7 +75,7 @@ FlinkMiniCluster(userConfiguration){
     }
 
     val localExecution = if(numTaskManagers == 1){
-      false
+      true
     }else{
       false
     }
@@ -87,6 +92,10 @@ FlinkMiniCluster(userConfiguration){
         config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
         config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
 
+        if(singleActorSystem){
+          config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
+        }
+
         val jc = JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
         jobClient = Some(jc)
         jc
@@ -101,11 +110,16 @@ FlinkMiniCluster(userConfiguration){
 
   override def shutdown(): Unit = {
     super.shutdown()
-    jobClientActorSystem.shutdown()
+
+    if(!singleActorSystem) {
+      jobClientActorSystem.shutdown()
+    }
   }
 
   override def awaitTermination(): Unit = {
-    jobClientActorSystem.awaitTermination()
+    if(!singleActorSystem) {
+      jobClientActorSystem.awaitTermination()
+    }
     super.awaitTermination()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 253984c..a4040fa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -73,7 +73,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   import taskManagerConfig.{timeout => tmTimeout, _}
   implicit val timeout = tmTimeout
 
-
   log.info(s"Starting task manager at ${self.path}.")
 
   val REGISTRATION_DELAY = 0 seconds
@@ -558,7 +557,7 @@ object TaskManager {
             "configuration.")
         }
 
-        JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
+        JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
     }
 
     val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
@@ -667,10 +666,6 @@ object TaskManager {
     system.actorOf(Props(classOf[TaskManagerProfiler], instancePath, reportInterval), PROFILER_NAME)
   }
 
-  def getAkkaURL(address: String): String = {
-    s"akka.tcp://flink@${address}/user/taskmanager"
-  }
-
   def checkTempDirs(tmpDirs: Array[String]): Unit = {
     tmpDirs.zipWithIndex.foreach {
       case (dir: String, _) =>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 5a51265..806d9b4 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,7 +26,9 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
 
-class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration) {
+class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration,
+  true) {
+
   override def generateConfiguration(userConfig: Configuration): Configuration = {
     val cfg = new Configuration()
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index ce54a74..ce068c9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -100,14 +100,14 @@ object TestingUtils {
       networkConnectionConfig) with TestingTaskManager))
   }
 
-  def startTestingCluster(numSlots: Int, numTaskManagers: Int = 1): FlinkMiniCluster = {
+  def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT):
+  FlinkMiniCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
+    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs)
     config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000)
-    config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT)
-    val cluster = new TestingCluster(config)
-    cluster
+    config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+    new TestingCluster(config)
   }
 
   def setGlobalExecutionContext(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 38b52ca..b24b3ee 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -58,7 +58,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	//  Local Test Cluster Life Cycle
 	// --------------------------------------------------------------------------------------------
 
-	public void startCluster(){
+	public void startCluster() throws Exception{
 		this.executor = startCluster(numTaskManagers, taskManagerNumSlots);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 225936c..c86ccaa 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -46,7 +46,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 	@BeforeClass
 	public static void setup() throws Exception{
 		cluster = TestBaseUtils.startCluster(1, 4);
-
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 7939a3c..fe2d7fc 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public class TestBaseUtils {
+
 	protected static final int MINIMUM_HEAP_SIZE_MB = 192;
 
 	protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
@@ -78,7 +79,7 @@ public class TestBaseUtils {
 	}
 
 	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers, int
-			taskManagerNumSlots) {
+			taskManagerNumSlots) throws Exception {
 		Configuration config = new Configuration();
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 		config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 1ebe9eb..649e094 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -24,8 +24,10 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
 
-class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
-LocalFlinkMiniCluster(userConfiguration) {
+class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
+  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val forNumberString = System.getProperty("forkNumber")
@@ -68,7 +70,7 @@ LocalFlinkMiniCluster(userConfiguration) {
     }
 
     val localExecution = if(numTaskManagers == 1){
-      false
+      true
     }else{
       false
     }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index 9046d2d..6928963 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-			cluster = new ForkableFlinkMiniCluster(config);
+			cluster = new ForkableFlinkMiniCluster(config, false);
 
 			RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 104a440..8e0f9c8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -45,7 +45,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
       case 2 => Right(20)
     })
 
-    val resultPath = tempFolder.newFile().toPath.toUri.toString
+    val resultPath = tempFolder.newFile().toURI.toString
 
     val result = eithers.map{
       _ match {
@@ -68,7 +68,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
       case 2 => Left(20)
     })
 
-    val resultPath = tempFolder.newFile().toPath.toUri.toString
+    val resultPath = tempFolder.newFile().toURI.toString
 
     val result = eithers.map(_ match {
       case Left(i) => i
@@ -89,7 +89,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
       case 2 => Right(20)
     })
 
-    val resultPath = tempFolder.newFile().toPath.toUri.toString
+    val resultPath = tempFolder.newFile().toURI.toString
 
     val result = eithers.map(_ match {
       case Right(i) => i
@@ -110,7 +110,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
       case 2 => None
     })
 
-    val resultPath = tempFolder.newFile().toPath.toUri.toString
+    val resultPath = tempFolder.newFile().toURI.toString
 
 
     val result = eithers.map(_ match {
@@ -133,7 +133,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
       case 2 => Some(20)
     })
 
-    val resultPath = tempFolder.newFile().toPath.toUri.toString
+    val resultPath = tempFolder.newFile().toURI.toString
 
     val result = eithers.map(_ match {
       case Some(i) => i
@@ -154,7 +154,7 @@ class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestB
       case 2 => None
     })
 
-    val resultPath = tempFolder.newFile().toPath.toUri.toString
+    val resultPath = tempFolder.newFile().toURI.toString
 
     val result = eithers.map(_ match {
       case None => 20

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c93d9eaf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d5a0c0a..3cbfaa7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -662,7 +662,6 @@ under the License.
 						<!-- Build files -->
 						<exclude>**/*.iml</exclude>
 						<exclude>flink-quickstart/**/testArtifact/goal.txt</exclude>
-						<exclude>atlassian-ide-plugin.xml</exclude>
 						<!-- Generated content -->
 						<exclude>**/target/**</exclude>
 						<exclude>docs/_site/**</exclude>