You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:12 UTC
[02/49] incubator-gearpump git commit: fix #2007, add Java DSL
fix #2007, add Java DSL
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/95d3c613
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/95d3c613
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/95d3c613
Branch: refs/heads/master
Commit: 95d3c6138eba36d2f1ae169a570cd1abed623a13
Parents: ef11f16
Author: manuzhang <ow...@gmail.com>
Authored: Fri Mar 18 12:45:22 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:22:20 2016 +0800
----------------------------------------------------------------------
.../examples/wordcountjava/dsl/WordCount.java | 86 ++++++++++++++++++++
.../examples/wordcount/dsl/WordCount.scala | 44 ++++++++++
.../javaapi/dsl/functions/FilterFunction.java | 28 +++++++
.../javaapi/dsl/functions/FlatMapFunction.java | 29 +++++++
.../javaapi/dsl/functions/GroupByFunction.java | 28 +++++++
.../javaapi/dsl/functions/MapFunction.java | 28 +++++++
.../javaapi/dsl/functions/ReduceFunction.java | 28 +++++++
.../streaming/dsl/example/WordCount.scala | 44 ----------
.../streaming/dsl/javaapi/JavaStream.scala | 64 +++++++++++++++
.../streaming/dsl/javaapi/JavaStreamApp.scala | 49 +++++++++++
10 files changed, 384 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
new file mode 100644
index 0000000..6857017
--- /dev/null
+++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.examples.wordcountjava.dsl;
+
+import com.typesafe.config.Config;
+import io.gearpump.cluster.ClusterConfig;
+import io.gearpump.cluster.UserConfig;
+import io.gearpump.cluster.client.ClientContext;
+import io.gearpump.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import io.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import io.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import io.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import io.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class WordCount {
+
+ public static void main(String[] args) throws InterruptedException {
+ main(ClusterConfig.defaultConfig(), args);
+ }
+
+ public static void main(Config akkaConf, String[] args) throws InterruptedException {
+ ClientContext context = new ClientContext(akkaConf);
+ JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
+ List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!");
+
+ JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
+
+ JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> apply(String s) {
+ return Lists.newArrayList(s.split("\\s+")).iterator();
+ }
+ }, "flatMap");
+
+ JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
+ @Override
+ public Tuple2<String, Integer> apply(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }, "map");
+
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,Integer>, String>() {
+ @Override
+ public String apply(Tuple2<String, Integer> tuple) {
+ return tuple._1();
+ }
+ }, 1, "groupBy");
+
+ JavaStream<Tuple2<String, Integer>> wordcount =groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+ @Override
+ public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+ return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
+ }
+ }, "reduce");
+
+ wordcount.log();
+
+ app.run();
+ context.close();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
new file mode 100644
index 0000000..cc516db
--- /dev/null
+++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.examples.wordcount.dsl
+
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.streaming.dsl.StreamApp
+import io.gearpump.streaming.dsl.StreamApp._
+import io.gearpump.util.AkkaApp
+
+object WordCount extends AkkaApp with ArgumentsParser{
+
+ override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val context = ClientContext(akkaConf)
+ val app = StreamApp("dsl", context)
+ val data = "This is a good start, bingo!! bingo!!"
+ app.source(data.lines.toList, 1, "source").
+ // word => (word, count)
+ flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ // (word, count1), (word, count2) => (word, count1 + count2)
+ groupByKey().sum.log
+
+ val appId = context.submit(app)
+ context.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
new file mode 100644
index 0000000..e4e137f
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that decides whether to reserve a value<T>
+ */
+public interface FilterFunction<T> extends Serializable {
+ boolean apply(T t);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
new file mode 100644
index 0000000..b65a338
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * a function that converts a value<T> to a iterator of value<R>
+ */
+public interface FlatMapFunction<T, R> extends Serializable {
+ Iterator<R> apply(T t);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
new file mode 100644
index 0000000..651c477
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that puts a value<T> into a Group
+ */
+public interface GroupByFunction<T, Group> extends Serializable {
+ Group apply(T t);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
new file mode 100644
index 0000000..a30a671
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that converts a value<T> to value<R>
+ */
+public interface MapFunction<T, R> extends Serializable {
+ R apply(T t);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
new file mode 100644
index 0000000..0f4bb18
--- /dev/null
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.javaapi.dsl.functions;
+
+import java.io.Serializable;
+
+/**
+ * a function that applies reduce operation
+ */
+public interface ReduceFunction<T> extends Serializable {
+ T apply(T t1, T t2);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala
deleted file mode 100644
index 6e168a5..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.dsl.example
-
-import io.gearpump.streaming.dsl.StreamApp
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import StreamApp._
-import com.typesafe.config.Config
-import io.gearpump.util.AkkaApp
-
-object WordCount extends AkkaApp with ArgumentsParser{
-
- override val options: Array[(String, CLIOption[Any])] = Array.empty
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val context = ClientContext(akkaConf)
- val app = StreamApp("dsl", context)
- val data = "This is a good start, bingo!! bingo!!"
- app.source(data.lines.toList, 1, "source").
- // word => (word, count)
- flatMap(line => line.split("[\\s]+")).map((_, 1)).
- // (word, count1), (word, count2) => (word, count1 + count2)
- groupByKey().sum.log
-
- val appId = context.submit(app)
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
new file mode 100644
index 0000000..42d6ca7
--- /dev/null
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.dsl.javaapi
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.dsl.Stream
+import io.gearpump.streaming.javaapi.dsl.functions._
+import io.gearpump.streaming.task.Task
+
+import scala.collection.JavaConverters._
+
+/**
+ * Java DSL
+ */
+class JavaStream[T](val stream: Stream[T]) {
+
+ def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap({t: T => fn(t).asScala}, description))
+ }
+
+ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.map({t: T => fn(t)}, description))
+ }
+
+ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.filter({t: T => fn(t)}, description))
+ }
+
+ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.reduce({(t1: T, t2: T) => fn(t1, t2)}, description))
+ }
+
+ def log(): Unit = {
+ stream.log()
+ }
+
+ def merge(other: JavaStream[T], description: String = null): JavaStream[T] = {
+ new JavaStream[T](stream.merge(other.stream, description))
+ }
+
+ def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String): JavaStream[T] = {
+ new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+ }
+
+ def process[R](processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String): JavaStream[R] = {
+ new JavaStream[R](stream.process(processor, parallelism, conf, description))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
new file mode 100644
index 0000000..0ad03cd
--- /dev/null
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.streaming.dsl.javaapi
+
+import java.util.Collection
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import io.gearpump.streaming.source.DataSource
+
+import scala.collection.JavaConverters._
+
+class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
+
+ private val streamApp = StreamApp(name, context, userConfig)
+
+ def source[T](collection: Collection[T], parallelism: Int,
+ conf: UserConfig, description: String): JavaStream[T] = {
+ val dataSource = new CollectionDataSource(collection.asScala.toSeq)
+ source(dataSource, parallelism, conf, description)
+ }
+
+ def source[T](dataSource: DataSource, parallelism: Int,
+ conf: UserConfig, description: String): JavaStream[T] = {
+ new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
+ }
+
+ def run(): Unit = {
+ context.submit(streamApp)
+ }
+
+}