You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/02 09:53:41 UTC
incubator-gearpump git commit: [GEARPUMP-334] Fix Java WordCount DSL
example
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 0f5f7221e -> 4ad0ec428
[GEARPUMP-334] Fix Java WordCount DSL example
Author: manuzhang <ow...@gmail.com>
Closes #204 from manuzhang/fix_wc_java_dsl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/4ad0ec42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/4ad0ec42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/4ad0ec42
Branch: refs/heads/master
Commit: 4ad0ec42819d81adeac57b13b772b023a8746aac
Parents: 0f5f722
Author: manuzhang <ow...@gmail.com>
Authored: Wed Aug 2 17:53:15 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 2 17:53:26 2017 +0800
----------------------------------------------------------------------
.../gearpump/cluster/client/ClientContext.scala | 37 ++++++++++++++++----
.../cluster/client/RuntimeEnvironment.scala | 5 ++-
.../cluster/embedded/EmbeddedCluster.scala | 7 +---
.../embedded/EmbeddedRuntimeEnvironemnt.scala | 6 ++--
.../examples/wordcountjava/dsl/WordCount.java | 14 ++++++--
.../gearpump/akkastream/graph/RemoteGraph.scala | 12 +------
.../experiments/storm/main/GearpumpNimbus.scala | 2 +-
.../gearpump/services/MasterService.scala | 2 +-
8 files changed, 52 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index fc8af59..4840120 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -40,14 +40,10 @@ import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}
/**
- * ClientContext is a user facing util to submit/manage an application.
- *
- * TODO: add interface to query master here
+ * ClientContext is a user facing utility to interact with the master.
+ * (e.g. submit/manage an application).
*/
-class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
- def this(config: Config) = {
- this(config, null, null)
- }
+class ClientContext protected(config: Config, sys: ActorSystem, _master: ActorRef) {
private val LOG: Logger = LogUtil.getLogger(getClass)
implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
@@ -178,9 +174,36 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
object ClientContext {
+ /**
+ * Create a [[ClientContext]] which will instantiate an actor system
+ * to interact with the master parsed from `gearpump.cluster.masters`.
+ * The config is loaded from classpath.
+ */
def apply(): ClientContext = apply(ClusterConfig.default())
+ /**
+ * Create a [[ClientContext]] which will instantiate an actor system
+ * to interact with the master parsed from `gearpump.cluster.masters`
+ * through the given config.
+ */
def apply(config: Config): ClientContext = {
RuntimeEnvironment.newClientContext(config)
}
+
+ /**
+ * Create a [[ClientContext]] for the passed in actor system
+ * to interact with the master parsed from `gearpump.cluster.masters`
+ * through the given config.
+ */
+ def apply(config: Config, system: ActorSystem): ClientContext = {
+ new ClientContext(config, system, null)
+ }
+
+ /**
+ * Create a [[ClientContext]] for the passed in actor system
+ * to interact with the given master.
+ */
+ def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
+ new ClientContext(config, system, master)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
index e90e73b..cf5842f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
@@ -19,6 +19,7 @@
package org.apache.gearpump.cluster.client
import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext
import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt
/**
@@ -34,13 +35,15 @@ abstract class RuntimeEnvironment {
*/
class RemoteRuntimeEnvironment extends RuntimeEnvironment {
override def newClientContext(akkaConf: Config): ClientContext = {
- new ClientContext(akkaConf)
+ new RemoteClientContext(akkaConf)
}
}
object RuntimeEnvironment {
private var envInstance: RuntimeEnvironment = _
+ class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, null, null)
+
def get() : RuntimeEnvironment = {
Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
index a3a3e39..8abcd96 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -21,12 +21,11 @@ package org.apache.gearpump.cluster.embedded
import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
-
import akka.actor.{ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigValueFactory}
-
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
import org.apache.gearpump.cluster.master.{Master => MasterActor}
import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
@@ -65,10 +64,6 @@ class EmbeddedCluster(inputConfig: Config) {
config
}
- def newClientContext: ClientContext = {
- new ClientContext(config, system, master)
- }
-
def stop(): Unit = {
system.stop(master)
system.terminate()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
index 246fabd..fbea53f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.cluster.embedded
import com.typesafe.config.Config
import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment}
-import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
/**
* The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application
@@ -28,12 +28,12 @@ import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClie
*/
class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment {
override def newClientContext(akkaConf: Config): ClientContext = {
- new LocalClientContext(akkaConf)
+ new EmbeddedClientContext(akkaConf)
}
}
object EmbeddedRuntimeEnvironemnt {
- class LocalClientContext private (cluster: EmbeddedCluster)
+ class EmbeddedClientContext private(cluster: EmbeddedCluster)
extends ClientContext(cluster.config, cluster.system, cluster.master) {
def this(akkaConf: Config) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 2830b16..e8467fa 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
import org.apache.gearpump.streaming.task.TaskContext;
import scala.Tuple2;
@@ -46,7 +47,7 @@ public class WordCount {
}
public static void main(Config akkaConf, String[] args) throws InterruptedException {
- ClientContext context = new ClientContext(akkaConf);
+ ClientContext context = ClientContext.apply(akkaConf);
JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
@@ -69,6 +70,7 @@ public class WordCount {
private static class StringSource implements DataSource {
private final String str;
+ private boolean hasNext = true;
StringSource(String str) {
this.str = str;
@@ -80,7 +82,9 @@ public class WordCount {
@Override
public Message read() {
- return new DefaultMessage(str, Instant.now());
+ Message msg = new DefaultMessage(str, Instant.now());
+ hasNext = false;
+ return msg;
}
@Override
@@ -89,7 +93,11 @@ public class WordCount {
@Override
public Instant getWatermark() {
- return Instant.now();
+ if (hasNext) {
+ return Instant.now();
+ } else {
+ return Watermark.MAX();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 4d400ff..f45cae0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -26,7 +26,6 @@ import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
import akka.stream.impl.StreamLayout.Module
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.util.Graph
@@ -48,16 +47,8 @@ object RemoteGraph {
*/
class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
extends SubGraphMaterializer {
- private val local = if (useInProcessCluster) {
- Some(EmbeddedCluster())
- } else {
- None
- }
- private val context: ClientContext = local match {
- case Some(l) => l.newClientContext
- case None => ClientContext(null)
- }
+ private val context: ClientContext = ClientContext()
override def materialize(subGraph: SubGraph,
inputMatValues: scala.collection.mutable.Map[Module, Any]):
@@ -105,7 +96,6 @@ object RemoteGraph {
override def shutdown: Unit = {
context.close()
- local.foreach(_.stop())
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index e2d421c..987546c 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -60,7 +60,7 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
val akkaConf = updateClientConfig(inputAkkaConf)
val system = ActorSystem("storm", akkaConf)
- val clientContext = new ClientContext(akkaConf, system, null)
+ val clientContext = ClientContext(akkaConf, system, null)
val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
val thriftConf: JMap[AnyRef, AnyRef] = Map(
Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index 5ba101b..be96577 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -160,7 +160,7 @@ class MasterService(val master: ActorRef,
val msg = java.net.URLDecoder.decode(request, "UTF-8")
val submitApplicationRequest = read[SubmitApplicationRequest](msg)
import submitApplicationRequest.{appName, dag, processors, userConfig}
- val context = new ClientContext(system.settings.config, system, master)
+ val context = ClientContext(system.settings.config, system, master)
val graph = dag.mapVertex { processorId =>
processors(processorId)