You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:12 UTC

[02/49] incubator-gearpump git commit: fix #2007, add Java DSL

fix #2007, add Java DSL


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/95d3c613
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/95d3c613
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/95d3c613

Branch: refs/heads/master
Commit: 95d3c6138eba36d2f1ae169a570cd1abed623a13
Parents: ef11f16
Author: manuzhang <ow...@gmail.com>
Authored: Fri Mar 18 12:45:22 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:22:20 2016 +0800

----------------------------------------------------------------------
 .../examples/wordcountjava/dsl/WordCount.java   | 86 ++++++++++++++++++++
 .../examples/wordcount/dsl/WordCount.scala      | 44 ++++++++++
 .../javaapi/dsl/functions/FilterFunction.java   | 28 +++++++
 .../javaapi/dsl/functions/FlatMapFunction.java  | 29 +++++++
 .../javaapi/dsl/functions/GroupByFunction.java  | 28 +++++++
 .../javaapi/dsl/functions/MapFunction.java      | 28 +++++++
 .../javaapi/dsl/functions/ReduceFunction.java   | 28 +++++++
 .../streaming/dsl/example/WordCount.scala       | 44 ----------
 .../streaming/dsl/javaapi/JavaStream.scala      | 64 +++++++++++++++
 .../streaming/dsl/javaapi/JavaStreamApp.scala   | 49 +++++++++++
 10 files changed, 384 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
new file mode 100644
index 0000000..6857017
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.examples.wordcountjava.dsl;
+
+import com.typesafe.config.Config;
+import io.gearpump.cluster.ClusterConfig;
+import io.gearpump.cluster.UserConfig;
+import io.gearpump.cluster.client.ClientContext;
+import io.gearpump.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import io.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import io.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import io.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import io.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class WordCount {
+
+  public static void main(String[] args) throws InterruptedException {
+    main(ClusterConfig.defaultConfig(), args);
+  }
+
+  public static void main(Config akkaConf, String[] args) throws InterruptedException {
+    ClientContext context = new ClientContext(akkaConf);
+    JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+    List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!");
+
+    JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
+
+    JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterator<String> apply(String s) {
+        return Lists.newArrayList(s.split("\\s+")).iterator();
+      }
+    }, "flatMap");
+
+    JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
+      @Override
+      public Tuple2<String, Integer> apply(String s) {
+        return new Tuple2<String, Integer>(s, 1);
+      }
+    }, "map");
+
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,Integer>, String>() {
+      @Override
+      public String apply(Tuple2<String, Integer> tuple) {
+        return tuple._1();
+      }
+    }, 1, "groupBy");
+
+    JavaStream<Tuple2<String, Integer>> wordcount =groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+      @Override
+      public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+        return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
+      }
+    }, "reduce");
+
+    wordcount.log();
+
+    app.run();
+    context.close();
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
new file mode 100644
index 0000000..cc516db
--- /dev/null
+++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.examples.wordcount.dsl
+
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.streaming.dsl.StreamApp
+import io.gearpump.streaming.dsl.StreamApp._
+import io.gearpump.util.AkkaApp
+
+object WordCount extends AkkaApp with ArgumentsParser{
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val context = ClientContext(akkaConf)
+    val app = StreamApp("dsl", context)
+    val data = "This is a good start, bingo!! bingo!!"
+    app.source(data.lines.toList, 1, "source").
+      // word => (word, count)
+      flatMap(line => line.split("[\\s]+")).map((_, 1)).
+      // (word, count1), (word, count2) => (word, count1 + count2)
+      groupByKey().sum.log
+
+    val appId = context.submit(app)
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
new file mode 100644
index 0000000..e4e137f
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that decides whether to reserve a value<T>
+ */
+public interface FilterFunction<T> extends Serializable {
+  boolean apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
new file mode 100644
index 0000000..b65a338
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * a function that converts a value<T> to a iterator of value<R>
+ */
+public interface FlatMapFunction<T, R> extends Serializable {
+  Iterator<R> apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
new file mode 100644
index 0000000..651c477
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that puts a value<T> into a Group
+ */
+public interface GroupByFunction<T, Group> extends Serializable {
+  Group apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
new file mode 100644
index 0000000..a30a671
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that converts a value<T> to value<R>
+ */
+public interface MapFunction<T, R> extends Serializable {
+  R apply(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
new file mode 100644
index 0000000..0f4bb18
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that applies reduce operation
+ */
+public interface ReduceFunction<T> extends Serializable {
+  T apply(T t1, T t2);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala
deleted file mode 100644
index 6e168a5..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.dsl.example
-
-import io.gearpump.streaming.dsl.StreamApp
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import StreamApp._
-import com.typesafe.config.Config
-import io.gearpump.util.AkkaApp
-
-object WordCount extends AkkaApp with ArgumentsParser{
-
-  override val options: Array[(String, CLIOption[Any])] = Array.empty
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val context = ClientContext(akkaConf)
-    val app = StreamApp("dsl", context)
-    val data = "This is a good start, bingo!! bingo!!"
-    app.source(data.lines.toList, 1, "source").
-      // word => (word, count)
-      flatMap(line => line.split("[\\s]+")).map((_, 1)).
-      // (word, count1), (word, count2) => (word, count1 + count2)
-      groupByKey().sum.log
-
-    val appId = context.submit(app)
-    context.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
new file mode 100644
index 0000000..42d6ca7
--- /dev/null
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.dsl.javaapi
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.dsl.Stream
+import io.gearpump.streaming.javaapi.dsl.functions._
+import io.gearpump.streaming.task.Task
+
+import scala.collection.JavaConverters._
+
+/**
+ * Java DSL
+ */
+class JavaStream[T](val stream: Stream[T]) {
+
+  def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMap({t: T => fn(t).asScala}, description))
+  }
+
+  def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.map({t: T => fn(t)}, description))
+  }
+
+  def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.filter({t: T => fn(t)}, description))
+  }
+
+  def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.reduce({(t1: T, t2: T) => fn(t1, t2)}, description))
+  }
+
+  def log(): Unit = {
+    stream.log()
+  }
+
+  def merge(other: JavaStream[T], description: String = null): JavaStream[T] = {
+    new JavaStream[T](stream.merge(other.stream, description))
+  }
+
+  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+  }
+
+  def process[R](processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String): JavaStream[R] = {
+    new JavaStream[R](stream.process(processor, parallelism, conf, description))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
new file mode 100644
index 0000000..0ad03cd
--- /dev/null
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.dsl.javaapi
+
+import java.util.Collection
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import io.gearpump.streaming.source.DataSource
+
+import scala.collection.JavaConverters._
+
+class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
+
+  private val streamApp = StreamApp(name, context, userConfig)
+
+  def source[T](collection: Collection[T], parallelism: Int,
+                conf: UserConfig, description: String): JavaStream[T] = {
+    val dataSource = new CollectionDataSource(collection.asScala.toSeq)
+    source(dataSource, parallelism, conf, description)
+  }
+
+  def source[T](dataSource: DataSource, parallelism: Int,
+                conf: UserConfig, description: String): JavaStream[T] = {
+    new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
+  }
+
+  def run(): Unit = {
+    context.submit(streamApp)
+  }
+
+}