You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/07/31 06:37:07 UTC

incubator-gearpump git commit: [GEARPUMP-331] Allow applications to be run in IDE

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master ac8ac0392 -> fa3f892d7


[GEARPUMP-331] Allow applications to be run in IDE

Author: huafengw <fv...@gmail.com>

Closes #202 from huafengw/GP331.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fa3f892d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fa3f892d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fa3f892d

Branch: refs/heads/master
Commit: fa3f892d76ca932fae44587f7860030b32f401bf
Parents: ac8ac03
Author: huafengw <fv...@gmail.com>
Authored: Mon Jul 31 14:36:38 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jul 31 14:36:48 2017 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/client/ClientContext.scala | 21 ++------
 .../cluster/client/RuntimeEnvironment.scala     | 55 ++++++++++++++++++++
 .../cluster/embedded/EmbeddedCluster.scala      | 44 ++++++----------
 .../embedded/EmbeddedRuntimeEnvironemnt.scala   | 48 +++++++++++++++++
 .../gearpump/cluster/main/AppSubmitter.scala    |  2 +
 .../apache/gearpump/cluster/MasterHarness.scala |  6 ++-
 docs/contents/dev/dev-write-1st-app.md          |  4 +-
 .../examples/wordcountjava/WordCount.java       | 30 +----------
 .../examples/wordcount/WordCount.scala          | 30 +----------
 .../examples/wordcount/dsl/WordCount.scala      |  6 +--
 .../gearpump/akkastream/graph/RemoteGraph.scala |  6 +--
 .../gearpump/services/MasterService.scala       |  2 +-
 12 files changed, 137 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 48b95d8..fc8af59 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -45,17 +45,12 @@ import scala.util.{Failure, Success, Try}
  * TODO: add interface to query master here
  */
 class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-  def this(system: ActorSystem) = {
-    this(system.settings.config, system, null)
-  }
-
   def this(config: Config) = {
     this(config, null, null)
   }
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
   implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
-  LOG.info(s"Starting system ${system.name}")
   private val jarStoreClient = new JarStoreClient(config, system)
   private val masterClientTimeout = {
     val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
@@ -183,19 +178,9 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
 
 object ClientContext {
 
-  def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
-
-  def apply(system: ActorSystem): ClientContext = {
-    new ClientContext(ClusterConfig.default(), system, null)
-  }
-
-  def apply(system: ActorSystem, master: ActorRef): ClientContext = {
-    new ClientContext(ClusterConfig.default(), system, master)
-  }
-
-  def apply(config: Config): ClientContext = new ClientContext(config, null, null)
+  def apply(): ClientContext = apply(ClusterConfig.default())
 
-  def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
-    new ClientContext(config, system, master)
+  def apply(config: Config): ClientContext = {
+    RuntimeEnvironment.newClientContext(config)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
new file mode 100644
index 0000000..e90e73b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt
+
+/**
+ * The RuntimeEnvironment is the context decides where an application is submitted to.
+ */
+abstract class RuntimeEnvironment {
+  def newClientContext(akkaConf: Config): ClientContext
+}
+
+/**
+ * Usually RemoteRuntimeEnvironment is the default enviroment when user using command line
+ * to submit application. It will connect to the right remote Master.
+ */
+class RemoteRuntimeEnvironment extends RuntimeEnvironment {
+  override def newClientContext(akkaConf: Config): ClientContext = {
+    new ClientContext(akkaConf)
+  }
+}
+
+object RuntimeEnvironment {
+  private var envInstance: RuntimeEnvironment = _
+
+  def get() : RuntimeEnvironment = {
+    Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt)
+  }
+
+  def newClientContext(akkaConf: Config): ClientContext = {
+    get().newClientContext(akkaConf)
+  }
+
+  def setRuntimeEnv(env: RuntimeEnvironment): Unit = {
+    envInstance = env
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
index 9bde4d1..a3a3e39 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -36,34 +36,22 @@ import org.apache.gearpump.util.{LogUtil, Util}
  * Create a in-process cluster with single worker
  */
 class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
   private val LOG = LogUtil.getLogger(getClass)
+  private val workerCount: Int = 1
+  private val port = Util.findFreePort().get
+  private[embedded] val config: Config = getConfig(inputConfig, port)
+  private[embedded] val system: ActorSystem = ActorSystem(MASTER, config)
+  private[embedded] val master: ActorRef = system.actorOf(Props[MasterActor], MASTER)
 
-  def start(): Unit = {
-    val port = Util.findFreePort().get
-    val akkaConf = getConfig(inputConfig, port)
-    _config = akkaConf
-    val system = ActorSystem(MASTER, akkaConf)
-
-    val master = system.actorOf(Props[MasterActor], MASTER)
-
-    0.until(workerCount).foreach { id =>
-      system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
-    }
-    this._master = master
-    this._system = system
-
-    LOG.info("=================================")
-    LOG.info("Local Cluster is started at: ")
-    LOG.info(s"                 127.0.0.1:$port")
-    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+  0.until(workerCount).foreach { id =>
+    system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
   }
 
+  LOG.info("=================================")
+  LOG.info("Local Cluster is started at: ")
+  LOG.info(s"                 127.0.0.1:$port")
+  LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+
   private def getConfig(inputConfig: Config, port: Int): Config = {
     val config = inputConfig.
       withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
@@ -78,13 +66,13 @@ class EmbeddedCluster(inputConfig: Config) {
   }
 
   def newClientContext: ClientContext = {
-    ClientContext(_config, _system, _master)
+    new ClientContext(config, system, master)
   }
 
   def stop(): Unit = {
-    _system.stop(_master)
-    _system.terminate()
-    Await.result(_system.whenTerminated, Duration.Inf)
+    system.stop(master)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
new file mode 100644
index 0000000..246fabd
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.embedded
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment}
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClientContext
+
+/**
+ * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application
+ * from IDE. It will create an embedded cluster and user's applcaition will run in a single
+ * local process.
+ */
+class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment {
+  override def newClientContext(akkaConf: Config): ClientContext = {
+    new LocalClientContext(akkaConf)
+  }
+}
+
+object EmbeddedRuntimeEnvironemnt {
+  class LocalClientContext private (cluster: EmbeddedCluster)
+    extends ClientContext(cluster.config, cluster.system, cluster.master) {
+
+    def this(akkaConf: Config) {
+      this(new EmbeddedCluster(akkaConf))
+    }
+
+    override def close(): Unit = {
+      super.close()
+      cluster.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
index 79f31eb..c5ddcb0 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.util.jar.JarFile
 
+import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment}
 import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
 
 import scala.util.{Failure, Success, Try}
@@ -84,6 +85,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
       Thread.currentThread().setContextClassLoader(classLoader)
       val clazz = classLoader.loadClass(main)
       val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+      RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment)
       mainMethod.invoke(null, arguments)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
index 1ec5660..654da4b 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
@@ -22,15 +22,15 @@ import java.io.File
 import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException}
 import java.util.Properties
 import java.util.concurrent.{Executors, TimeUnit}
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext}
-
 import akka.actor.{Actor, ActorSystem, Address, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
-
 import org.apache.gearpump.cluster.MasterHarness.MockMaster
+import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment}
 import org.apache.gearpump.util.Constants._
 import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil}
 
@@ -63,6 +63,8 @@ trait MasterHarness {
     masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost")
 
     LOG.info(s"Actor system is started, $host, $port")
+    // Make sure there will be no EmbeddedCluster created, otherwise mock master won't work
+    RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment)
   }
 
   def shutdownActorSystem(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/docs/contents/dev/dev-write-1st-app.md
----------------------------------------------------------------------
diff --git a/docs/contents/dev/dev-write-1st-app.md b/docs/contents/dev/dev-write-1st-app.md
index 6234577..4ef1cc3 100644
--- a/docs/contents/dev/dev-write-1st-app.md
+++ b/docs/contents/dev/dev-write-1st-app.md
@@ -20,8 +20,8 @@ We'll use the classical [wordcount](https://github.com/apache/incubator-gearpump
 	      // (word, count1), (word, count2) => (word, count1 + count2)
 	      groupByKey().sum.log
 	
-         context.submit(app).waitUntilFinish()
-         context.close()
+        context.submit(app).waitUntilFinish()
+        context.close()
 	  }
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
index 5e3d472..cc6cd02 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -37,7 +37,6 @@ public class WordCount {
   }
 
   public static void main(Config akkaConf, String[] args) throws InterruptedException {
-
     // For split task, we config to create two tasks
     int splitTaskNumber = 2;
     Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
@@ -56,36 +55,9 @@ public class WordCount {
 
     UserConfig conf = UserConfig.empty();
     StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
-
-    EmbeddedCluster localCluster = null;
-
-    Boolean debugMode = System.getProperty("DEBUG") != null;
-
-    if (debugMode) {
-      localCluster = new EmbeddedCluster(akkaConf);
-      localCluster.start();
-    }
-
-    ClientContext masterClient = null;
-
-    if (localCluster != null) {
-      masterClient = localCluster.newClientContext();
-    } else {
-      // create master client
-      // It will read the master settings under gearpump.cluster.masters
-      masterClient = new ClientContext(akkaConf);
-    }
-
+    ClientContext masterClient = ClientContext.apply(akkaConf);
     masterClient.submit(app);
 
-    if (debugMode) {
-      Thread.sleep(30 * 1000); // sleep for 30 seconds.
-    }
-
     masterClient.close();
-
-    if (localCluster != null) {
-      localCluster.stop();
-    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 0e3d840..14965e7 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -21,7 +21,6 @@ package org.apache.gearpump.streaming.examples.wordcount
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.source.DataSourceProcessor
@@ -38,10 +37,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array(
     "split" -> CLIOption[Int]("<how many source tasks>", required = false,
       defaultValue = Some(1)),
-    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
-    "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)),
-    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false,
-      defaultValue = Some(30))
+    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
   )
 
   def application(config: ParseResult, system: ActorSystem): StreamApplication = {
@@ -60,32 +56,10 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     val config = parse(args)
-
-    val debugMode = config.getBoolean("debug")
-    val sleepSeconds = config.getInt("sleep")
-
-    val localCluster = if (debugMode) {
-      val cluster = new EmbeddedCluster(akkaConf: Config)
-      cluster.start()
-      Some(cluster)
-    } else {
-      None
-    }
-
-    val context: ClientContext = localCluster match {
-      case Some(local) => local.newClientContext
-      case None => ClientContext(akkaConf)
-    }
-
+    val context: ClientContext = ClientContext(akkaConf)
     val app = application(config, context.system)
     context.submit(app)
-
-    if (debugMode) {
-      Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging.
-    }
-
     context.close()
-    localCluster.map(_.stop())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 65f63d2..d10dbe7 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -19,7 +19,6 @@
 package org.apache.gearpump.streaming.examples.wordcount.dsl
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
@@ -31,9 +30,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array.empty
 
   override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val cluster = new EmbeddedCluster(akkaConf)
-    cluster.start()
-    val context: ClientContext = cluster.newClientContext
+    val context: ClientContext = ClientContext(akkaConf)
     val app = StreamApp("dsl", context)
     val data = "This is a good start, bingo!! bingo!!"
     app.source(data.lines.toList, 1, "source").
@@ -44,6 +41,5 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
     context.submit(app).waitUntilFinish()
     context.close()
-    cluster.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 99ebe17..4d400ff 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -49,16 +49,14 @@ object RemoteGraph {
   class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
     extends SubGraphMaterializer {
     private val local = if (useInProcessCluster) {
-      val cluster = EmbeddedCluster()
-      cluster.start()
-      Some(cluster)
+      Some(EmbeddedCluster())
     } else {
       None
     }
 
     private val context: ClientContext = local match {
       case Some(l) => l.newClientContext
-      case None => ClientContext(system)
+      case None => ClientContext(null)
     }
 
     override def materialize(subGraph: SubGraph,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index be96577..5ba101b 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -160,7 +160,7 @@ class MasterService(val master: ActorRef,
           val msg = java.net.URLDecoder.decode(request, "UTF-8")
           val submitApplicationRequest = read[SubmitApplicationRequest](msg)
           import submitApplicationRequest.{appName, dag, processors, userConfig}
-          val context = ClientContext(system.settings.config, system, master)
+          val context = new ClientContext(system.settings.config, system, master)
 
           val graph = dag.mapVertex { processorId =>
             processors(processorId)