You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/02 09:53:41 UTC

incubator-gearpump git commit: [GEARPUMP-334] Fix Java WordCount DSL example

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 0f5f7221e -> 4ad0ec428


[GEARPUMP-334] Fix Java WordCount DSL example

Author: manuzhang <ow...@gmail.com>

Closes #204 from manuzhang/fix_wc_java_dsl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/4ad0ec42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/4ad0ec42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/4ad0ec42

Branch: refs/heads/master
Commit: 4ad0ec42819d81adeac57b13b772b023a8746aac
Parents: 0f5f722
Author: manuzhang <ow...@gmail.com>
Authored: Wed Aug 2 17:53:15 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Aug 2 17:53:26 2017 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/client/ClientContext.scala | 37 ++++++++++++++++----
 .../cluster/client/RuntimeEnvironment.scala     |  5 ++-
 .../cluster/embedded/EmbeddedCluster.scala      |  7 +---
 .../embedded/EmbeddedRuntimeEnvironemnt.scala   |  6 ++--
 .../examples/wordcountjava/dsl/WordCount.java   | 14 ++++++--
 .../gearpump/akkastream/graph/RemoteGraph.scala | 12 +------
 .../experiments/storm/main/GearpumpNimbus.scala |  2 +-
 .../gearpump/services/MasterService.scala       |  2 +-
 8 files changed, 52 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index fc8af59..4840120 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -40,14 +40,10 @@ import scala.concurrent.{Await, Future}
 import scala.util.{Failure, Success, Try}
 
 /**
- * ClientContext is a user facing util to submit/manage an application.
- *
- * TODO: add interface to query master here
+ * ClientContext is a user facing utility to interact with the master.
+ * (e.g. submit/manage an application).
  */
-class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
-  def this(config: Config) = {
-    this(config, null, null)
-  }
+class ClientContext protected(config: Config, sys: ActorSystem, _master: ActorRef) {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
   implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
@@ -178,9 +174,36 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
 
 object ClientContext {
 
+  /**
+   * Create a [[ClientContext]] which will instantiate an actor system
+   * to interact with the master parsed from `gearpump.cluster.masters`.
+   * The config is loaded from classpath.
+   */
   def apply(): ClientContext = apply(ClusterConfig.default())
 
+  /**
+   * Create a [[ClientContext]] which will instantiate an actor system
+   * to interact with the master parsed from `gearpump.cluster.masters`
+   * through the given config.
+   */
   def apply(config: Config): ClientContext = {
     RuntimeEnvironment.newClientContext(config)
   }
+
+  /**
+   * Create a [[ClientContext]] for the passed in actor system
+   * to interact with the master parsed from `gearpump.cluster.masters`
+   * through the given config.
+   */
+  def apply(config: Config, system: ActorSystem): ClientContext = {
+    new ClientContext(config, system, null)
+  }
+
+  /**
+   * Create a [[ClientContext]] for the passed in actor system
+   * to interact with the given master.
+   */
+  def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
+    new ClientContext(config, system, master)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
index e90e73b..cf5842f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
@@ -19,6 +19,7 @@
 package org.apache.gearpump.cluster.client
 
 import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext
 import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt
 
 /**
@@ -34,13 +35,15 @@ abstract class RuntimeEnvironment {
  */
 class RemoteRuntimeEnvironment extends RuntimeEnvironment {
   override def newClientContext(akkaConf: Config): ClientContext = {
-    new ClientContext(akkaConf)
+    new RemoteClientContext(akkaConf)
   }
 }
 
 object RuntimeEnvironment {
   private var envInstance: RuntimeEnvironment = _
 
+  class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, null, null)
+
   def get() : RuntimeEnvironment = {
     Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
index a3a3e39..8abcd96 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -21,12 +21,11 @@ package org.apache.gearpump.cluster.embedded
 import scala.collection.JavaConverters._
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
-
 import akka.actor.{ActorRef, ActorSystem, Props}
 import com.typesafe.config.{Config, ConfigValueFactory}
-
 import org.apache.gearpump.cluster.ClusterConfig
 import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
 import org.apache.gearpump.cluster.master.{Master => MasterActor}
 import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
 import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
@@ -65,10 +64,6 @@ class EmbeddedCluster(inputConfig: Config) {
     config
   }
 
-  def newClientContext: ClientContext = {
-    new ClientContext(config, system, master)
-  }
-
   def stop(): Unit = {
     system.stop(master)
     system.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
index 246fabd..fbea53f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.cluster.embedded
 
 import com.typesafe.config.Config
 import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment}
-import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClientContext
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
 
 /**
  * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application
@@ -28,12 +28,12 @@ import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClie
  */
 class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment {
   override def newClientContext(akkaConf: Config): ClientContext = {
-    new LocalClientContext(akkaConf)
+    new EmbeddedClientContext(akkaConf)
   }
 }
 
 object EmbeddedRuntimeEnvironemnt {
-  class LocalClientContext private (cluster: EmbeddedCluster)
+  class EmbeddedClientContext private(cluster: EmbeddedCluster)
     extends ClientContext(cluster.config, cluster.system, cluster.master) {
 
     def this(akkaConf: Config) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 2830b16..e8467fa 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
 import org.apache.gearpump.streaming.task.TaskContext;
 import scala.Tuple2;
 
@@ -46,7 +47,7 @@ public class WordCount {
   }
 
   public static void main(Config akkaConf, String[] args) throws InterruptedException {
-    ClientContext context = new ClientContext(akkaConf);
+    ClientContext context = ClientContext.apply(akkaConf);
     JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
 
     JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
@@ -69,6 +70,7 @@ public class WordCount {
   private static class StringSource implements DataSource {
 
     private final String str;
+    private boolean hasNext = true;
 
     StringSource(String str) {
       this.str = str;
@@ -80,7 +82,9 @@ public class WordCount {
 
     @Override
     public Message read() {
-      return new DefaultMessage(str, Instant.now());
+      Message msg = new DefaultMessage(str, Instant.now());
+      hasNext = false;
+      return msg;
     }
 
     @Override
@@ -89,7 +93,11 @@ public class WordCount {
 
     @Override
     public Instant getWatermark() {
-      return Instant.now();
+      if (hasNext) {
+        return Instant.now();
+      } else {
+        return Watermark.MAX();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 4d400ff..f45cae0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -26,7 +26,6 @@ import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
 import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
 import akka.stream.impl.StreamLayout.Module
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.util.Graph
 
@@ -48,16 +47,8 @@ object RemoteGraph {
    */
   class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
     extends SubGraphMaterializer {
-    private val local = if (useInProcessCluster) {
-      Some(EmbeddedCluster())
-    } else {
-      None
-    }
 
-    private val context: ClientContext = local match {
-      case Some(l) => l.newClientContext
-      case None => ClientContext(null)
-    }
+    private val context: ClientContext = ClientContext()
 
     override def materialize(subGraph: SubGraph,
         inputMatValues: scala.collection.mutable.Map[Module, Any]):
@@ -105,7 +96,6 @@ object RemoteGraph {
 
     override def shutdown: Unit = {
       context.close()
-      local.foreach(_.stop())
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index e2d421c..987546c 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -60,7 +60,7 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
     val akkaConf = updateClientConfig(inputAkkaConf)
     val system = ActorSystem("storm", akkaConf)
 
-    val clientContext = new ClientContext(akkaConf, system, null)
+    val clientContext = ClientContext(akkaConf, system, null)
     val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
     val thriftConf: JMap[AnyRef, AnyRef] = Map(
       Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index 5ba101b..be96577 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -160,7 +160,7 @@ class MasterService(val master: ActorRef,
           val msg = java.net.URLDecoder.decode(request, "UTF-8")
           val submitApplicationRequest = read[SubmitApplicationRequest](msg)
           import submitApplicationRequest.{appName, dag, processors, userConfig}
-          val context = new ClientContext(system.settings.config, system, master)
+          val context = ClientContext(system.settings.config, system, master)
 
           val graph = dag.mapVertex { processorId =>
             processors(processorId)