You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/07/31 06:37:07 UTC
incubator-gearpump git commit: [GEARPUMP-331] Allow applications to
be run in IDE
Repository: incubator-gearpump
Updated Branches:
refs/heads/master ac8ac0392 -> fa3f892d7
[GEARPUMP-331] Allow applications to be run in IDE
Author: huafengw <fv...@gmail.com>
Closes #202 from huafengw/GP331.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fa3f892d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fa3f892d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fa3f892d
Branch: refs/heads/master
Commit: fa3f892d76ca932fae44587f7860030b32f401bf
Parents: ac8ac03
Author: huafengw <fv...@gmail.com>
Authored: Mon Jul 31 14:36:38 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jul 31 14:36:48 2017 +0800
----------------------------------------------------------------------
.../gearpump/cluster/client/ClientContext.scala | 21 ++------
.../cluster/client/RuntimeEnvironment.scala | 55 ++++++++++++++++++++
.../cluster/embedded/EmbeddedCluster.scala | 44 ++++++----------
.../embedded/EmbeddedRuntimeEnvironemnt.scala | 48 +++++++++++++++++
.../gearpump/cluster/main/AppSubmitter.scala | 2 +
.../apache/gearpump/cluster/MasterHarness.scala | 6 ++-
docs/contents/dev/dev-write-1st-app.md | 4 +-
.../examples/wordcountjava/WordCount.java | 30 +----------
.../examples/wordcount/WordCount.scala | 30 +----------
.../examples/wordcount/dsl/WordCount.scala | 6 +--
.../gearpump/akkastream/graph/RemoteGraph.scala | 6 +--
.../gearpump/services/MasterService.scala | 2 +-
12 files changed, 137 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 48b95d8..fc8af59 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -45,17 +45,12 @@ import scala.util.{Failure, Success, Try}
* TODO: add interface to query master here
*/
class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
- def this(system: ActorSystem) = {
- this(system.settings.config, system, null)
- }
-
def this(config: Config) = {
this(config, null, null)
}
private val LOG: Logger = LogUtil.getLogger(getClass)
implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
- LOG.info(s"Starting system ${system.name}")
private val jarStoreClient = new JarStoreClient(config, system)
private val masterClientTimeout = {
val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
@@ -183,19 +178,9 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
object ClientContext {
- def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
-
- def apply(system: ActorSystem): ClientContext = {
- new ClientContext(ClusterConfig.default(), system, null)
- }
-
- def apply(system: ActorSystem, master: ActorRef): ClientContext = {
- new ClientContext(ClusterConfig.default(), system, master)
- }
-
- def apply(config: Config): ClientContext = new ClientContext(config, null, null)
+ def apply(): ClientContext = apply(ClusterConfig.default())
- def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
- new ClientContext(config, system, master)
+ def apply(config: Config): ClientContext = {
+ RuntimeEnvironment.newClientContext(config)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
new file mode 100644
index 0000000..e90e73b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt
+
+/**
+ * The RuntimeEnvironment is the context decides where an application is submitted to.
+ */
+abstract class RuntimeEnvironment {
+ def newClientContext(akkaConf: Config): ClientContext
+}
+
+/**
+ * Usually RemoteRuntimeEnvironment is the default enviroment when user using command line
+ * to submit application. It will connect to the right remote Master.
+ */
+class RemoteRuntimeEnvironment extends RuntimeEnvironment {
+ override def newClientContext(akkaConf: Config): ClientContext = {
+ new ClientContext(akkaConf)
+ }
+}
+
+object RuntimeEnvironment {
+ private var envInstance: RuntimeEnvironment = _
+
+ def get() : RuntimeEnvironment = {
+ Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt)
+ }
+
+ def newClientContext(akkaConf: Config): ClientContext = {
+ get().newClientContext(akkaConf)
+ }
+
+ def setRuntimeEnv(env: RuntimeEnvironment): Unit = {
+ envInstance = env
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
index 9bde4d1..a3a3e39 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -36,34 +36,22 @@ import org.apache.gearpump.util.{LogUtil, Util}
* Create a in-process cluster with single worker
*/
class EmbeddedCluster(inputConfig: Config) {
-
- private val workerCount: Int = 1
- private var _master: ActorRef = null
- private var _system: ActorSystem = null
- private var _config: Config = null
-
private val LOG = LogUtil.getLogger(getClass)
+ private val workerCount: Int = 1
+ private val port = Util.findFreePort().get
+ private[embedded] val config: Config = getConfig(inputConfig, port)
+ private[embedded] val system: ActorSystem = ActorSystem(MASTER, config)
+ private[embedded] val master: ActorRef = system.actorOf(Props[MasterActor], MASTER)
- def start(): Unit = {
- val port = Util.findFreePort().get
- val akkaConf = getConfig(inputConfig, port)
- _config = akkaConf
- val system = ActorSystem(MASTER, akkaConf)
-
- val master = system.actorOf(Props[MasterActor], MASTER)
-
- 0.until(workerCount).foreach { id =>
- system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
- }
- this._master = master
- this._system = system
-
- LOG.info("=================================")
- LOG.info("Local Cluster is started at: ")
- LOG.info(s" 127.0.0.1:$port")
- LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+ 0.until(workerCount).foreach { id =>
+ system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
}
+ LOG.info("=================================")
+ LOG.info("Local Cluster is started at: ")
+ LOG.info(s" 127.0.0.1:$port")
+ LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+
private def getConfig(inputConfig: Config, port: Int): Config = {
val config = inputConfig.
withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
@@ -78,13 +66,13 @@ class EmbeddedCluster(inputConfig: Config) {
}
def newClientContext: ClientContext = {
- ClientContext(_config, _system, _master)
+ new ClientContext(config, system, master)
}
def stop(): Unit = {
- _system.stop(_master)
- _system.terminate()
- Await.result(_system.whenTerminated, Duration.Inf)
+ system.stop(master)
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
new file mode 100644
index 0000000..246fabd
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.embedded
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment}
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClientContext
+
+/**
+ * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application
+ * from IDE. It will create an embedded cluster and user's applcaition will run in a single
+ * local process.
+ */
+class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment {
+ override def newClientContext(akkaConf: Config): ClientContext = {
+ new LocalClientContext(akkaConf)
+ }
+}
+
+object EmbeddedRuntimeEnvironemnt {
+ class LocalClientContext private (cluster: EmbeddedCluster)
+ extends ClientContext(cluster.config, cluster.system, cluster.master) {
+
+ def this(akkaConf: Config) {
+ this(new EmbeddedCluster(akkaConf))
+ }
+
+ override def close(): Unit = {
+ super.close()
+ cluster.stop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
index 79f31eb..c5ddcb0 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -21,6 +21,7 @@ import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
+import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment}
import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
import scala.util.{Failure, Success, Try}
@@ -84,6 +85,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
Thread.currentThread().setContextClassLoader(classLoader)
val clazz = classLoader.loadClass(main)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment)
mainMethod.invoke(null, arguments)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
index 1ec5660..654da4b 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
@@ -22,15 +22,15 @@ import java.io.File
import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException}
import java.util.Properties
import java.util.concurrent.{Executors, TimeUnit}
+
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
-
import akka.actor.{Actor, ActorSystem, Address, Props}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
-
import org.apache.gearpump.cluster.MasterHarness.MockMaster
+import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment}
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil}
@@ -63,6 +63,8 @@ trait MasterHarness {
masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost")
LOG.info(s"Actor system is started, $host, $port")
+ // Make sure there will be no EmbeddedCluster created, otherwise mock master won't work
+ RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment)
}
def shutdownActorSystem(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/docs/contents/dev/dev-write-1st-app.md
----------------------------------------------------------------------
diff --git a/docs/contents/dev/dev-write-1st-app.md b/docs/contents/dev/dev-write-1st-app.md
index 6234577..4ef1cc3 100644
--- a/docs/contents/dev/dev-write-1st-app.md
+++ b/docs/contents/dev/dev-write-1st-app.md
@@ -20,8 +20,8 @@ We'll use the classical [wordcount](https://github.com/apache/incubator-gearpump
// (word, count1), (word, count2) => (word, count1 + count2)
groupByKey().sum.log
- context.submit(app).waitUntilFinish()
- context.close()
+ context.submit(app).waitUntilFinish()
+ context.close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
index 5e3d472..cc6cd02 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -37,7 +37,6 @@ public class WordCount {
}
public static void main(Config akkaConf, String[] args) throws InterruptedException {
-
// For split task, we config to create two tasks
int splitTaskNumber = 2;
Processor split = new Processor(Split.class).withParallelism(splitTaskNumber);
@@ -56,36 +55,9 @@ public class WordCount {
UserConfig conf = UserConfig.empty();
StreamApplication app = new StreamApplication("wordcountJava", conf, graph);
-
- EmbeddedCluster localCluster = null;
-
- Boolean debugMode = System.getProperty("DEBUG") != null;
-
- if (debugMode) {
- localCluster = new EmbeddedCluster(akkaConf);
- localCluster.start();
- }
-
- ClientContext masterClient = null;
-
- if (localCluster != null) {
- masterClient = localCluster.newClientContext();
- } else {
- // create master client
- // It will read the master settings under gearpump.cluster.masters
- masterClient = new ClientContext(akkaConf);
- }
-
+ ClientContext masterClient = ClientContext.apply(akkaConf);
masterClient.submit(app);
- if (debugMode) {
- Thread.sleep(30 * 1000); // sleep for 30 seconds.
- }
-
masterClient.close();
-
- if (localCluster != null) {
- localCluster.stop();
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 0e3d840..14965e7 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -21,7 +21,6 @@ package org.apache.gearpump.streaming.examples.wordcount
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import org.apache.gearpump.streaming.partitioner.HashPartitioner
import org.apache.gearpump.streaming.source.DataSourceProcessor
@@ -38,10 +37,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] = Array(
"split" -> CLIOption[Int]("<how many source tasks>", required = false,
defaultValue = Some(1)),
- "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
- "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)),
- "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false,
- defaultValue = Some(30))
+ "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
)
def application(config: ParseResult, system: ActorSystem): StreamApplication = {
@@ -60,32 +56,10 @@ object WordCount extends AkkaApp with ArgumentsParser {
override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
-
- val debugMode = config.getBoolean("debug")
- val sleepSeconds = config.getInt("sleep")
-
- val localCluster = if (debugMode) {
- val cluster = new EmbeddedCluster(akkaConf: Config)
- cluster.start()
- Some(cluster)
- } else {
- None
- }
-
- val context: ClientContext = localCluster match {
- case Some(local) => local.newClientContext
- case None => ClientContext(akkaConf)
- }
-
+ val context: ClientContext = ClientContext(akkaConf)
val app = application(config, context.system)
context.submit(app)
-
- if (debugMode) {
- Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging.
- }
-
context.close()
- localCluster.map(_.stop())
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 65f63d2..d10dbe7 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -19,7 +19,6 @@
package org.apache.gearpump.streaming.examples.wordcount.dsl
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
@@ -31,9 +30,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] = Array.empty
override def main(akkaConf: Config, args: Array[String]): Unit = {
- val cluster = new EmbeddedCluster(akkaConf)
- cluster.start()
- val context: ClientContext = cluster.newClientContext
+ val context: ClientContext = ClientContext(akkaConf)
val app = StreamApp("dsl", context)
val data = "This is a good start, bingo!! bingo!!"
app.source(data.lines.toList, 1, "source").
@@ -44,6 +41,5 @@ object WordCount extends AkkaApp with ArgumentsParser {
context.submit(app).waitUntilFinish()
context.close()
- cluster.stop()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 99ebe17..4d400ff 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -49,16 +49,14 @@ object RemoteGraph {
class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
extends SubGraphMaterializer {
private val local = if (useInProcessCluster) {
- val cluster = EmbeddedCluster()
- cluster.start()
- Some(cluster)
+ Some(EmbeddedCluster())
} else {
None
}
private val context: ClientContext = local match {
case Some(l) => l.newClientContext
- case None => ClientContext(system)
+ case None => ClientContext(null)
}
override def materialize(subGraph: SubGraph,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fa3f892d/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index be96577..5ba101b 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -160,7 +160,7 @@ class MasterService(val master: ActorRef,
val msg = java.net.URLDecoder.decode(request, "UTF-8")
val submitApplicationRequest = read[SubmitApplicationRequest](msg)
import submitApplicationRequest.{appName, dag, processors, userConfig}
- val context = ClientContext(system.settings.config, system, master)
+ val context = new ClientContext(system.settings.config, system, master)
val graph = dag.mapVertex { processorId =>
processors(processorId)