You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/12/07 11:51:20 UTC
[1/2] incubator-beam git commit: [BEAM-329] Update Spark runner
README.
Repository: incubator-beam
Updated Branches:
refs/heads/master 9ccf6dbea -> 02bb8c375
[BEAM-329] Update Spark runner README.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dce3a196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dce3a196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dce3a196
Branch: refs/heads/master
Commit: dce3a196a3a26fdd42225520faf3d9084ee48123
Parents: 9ccf6db
Author: Sela <an...@paypal.com>
Authored: Wed Dec 7 11:20:07 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Dec 7 12:49:21 2016 +0100
----------------------------------------------------------------------
runners/spark/README.md | 59 +++++++-------------
.../beam/runners/spark/examples/WordCount.java | 5 +-
2 files changed, 21 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dce3a196/runners/spark/README.md
----------------------------------------------------------------------
diff --git a/runners/spark/README.md b/runners/spark/README.md
index ef42fa7..aad65b3 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -38,32 +38,25 @@ with Apache Spark. This runner allows to execute both batch and streaming pipeli
- Side inputs/outputs
- Encoding
-### Sources and Sinks
-
-- Text
-- Hadoop
-- Avro
-- Kafka
-
### Fault-Tolerance
The Spark runner fault-tolerance guarantees the same guarantees as [Apache Spark](http://spark.apache.org/).
### Monitoring
-The Spark runner supports monitoring via Beam Aggregators implemented on top of Spark's [Accumulators](http://spark.apache.org/docs/latest/programming-guide.html#accumulators).
-Spark also provides a web UI for monitoring, more details [here](http://spark.apache.org/docs/latest/monitoring.html).
+The Spark runner supports user-defined counters via Beam Aggregators implemented on top of Spark's [Accumulators](http://spark.apache.org/docs/1.6.3/programming-guide.html#accumulators).
+The Aggregators (defined by the pipeline author) and Spark's internal metrics are reported using Spark's [metrics system](http://spark.apache.org/docs/1.6.3/monitoring.html#metrics).
+Spark also provides a web UI for monitoring, more details [here](http://spark.apache.org/docs/1.6.3/monitoring.html).
## Beam Model support
### Batch
-The Spark runner provides support for batch processing of Beam bounded PCollections as Spark [RDD](http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds)s.
+The Spark runner provides full support for the Beam Model in batch processing via Spark [RDD](http://spark.apache.org/docs/1.6.3/programming-guide.html#resilient-distributed-datasets-rdds)s.
### Streaming
-The Spark runner currently provides partial support for stream processing of Beam unbounded PCollections as Spark [DStream](http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams)s.
-Currently, both *FixedWindows* and *SlidingWindows* are supported, but only with processing-time triggers and discarding pane.
+Providing full support for the Beam Model in streaming pipelines is under development. To follow-up you can subscribe to our [mailing list](http://beam.incubator.apache.org/get-started/support/).
### issue tracking
@@ -84,19 +77,21 @@ Then switch to the newly created directory and run Maven to build the Apache Bea
Now Apache Beam and the Spark Runner are installed in your local maven repository.
-If we wanted to run a Beam pipeline with the default options of a single threaded Spark
-instance in local mode, we would do the following:
+If we wanted to run a Beam pipeline with the default options of a Spark instance in local mode,
+we would do the following:
Pipeline p = <logic for pipeline creation >
- EvaluationResult result = SparkRunner.create().run(p);
+ PipelineResult result = p.run();
+ result.waitUntilFinish();
To create a pipeline runner to run against a different Spark cluster, with a custom master url we
would do the following:
- Pipeline p = <logic for pipeline creation >
- SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setSparkMaster("spark://host:port");
- EvaluationResult result = SparkRunner.create(options).run(p);
+ Pipeline p = <logic for pipeline creation >
+ PipelineResult result = p.run();
+ result.waitUntilFinish();
## Word Count Example
@@ -108,12 +103,11 @@ Switch to the Spark runner directory:
cd runners/spark
-Then run the [word count example][wc] from the SDK using a single threaded Spark instance
-in local mode:
+Then run the [word count example][wc] from the SDK using a Spark instance in local mode:
- mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \
- -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkRunner \
- -DsparkMaster=local
+ mvn exec:exec -DmainClass=org.apache.beam.runners.spark.examples.WordCount \
+ -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkRunner \
+ -DsparkMaster=local
Check the output by running:
@@ -122,24 +116,9 @@ Check the output by running:
__Note: running examples using `mvn exec:exec` only works for Spark local mode at the
moment. See the next section for how to run on a cluster.__
-[wc]: https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+[wc]: https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
## Running on a Cluster
Spark Beam pipelines can be run on a cluster using the `spark-submit` command.
-First copy a text document to HDFS:
-
- curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt | hadoop fs -put - kinglear.txt
-
-Then run the word count example using Spark submit with the `yarn-client` master
-(`yarn-cluster` works just as well):
-
- spark-submit \
- --class org.apache.beam.examples.WordCount \
- --master yarn-client \
- target/spark-runner-*-spark-app.jar \
- --inputFile=kinglear.txt --output=out --runner=SparkRunner --sparkMaster=yarn-client
-
-Check the output by running:
-
- hadoop fs -tail out-00000-of-00002
+TBD pending native HDFS support (currently blocked by [BEAM-59](https://issues.apache.org/jira/browse/BEAM-59)).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dce3a196/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 38dae38..d7e5207 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
- * Duplicated to avoid dependency on beam-examples.
+ * Duplicated from beam-examples-java to avoid dependency.
*/
public class WordCount {
@@ -126,8 +126,7 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- //TODO: remove withoutValidation once possible
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation())
+ p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
[2/2] incubator-beam git commit: [BEAM-329] This closes #1532
Posted by jb...@apache.org.
[BEAM-329] This closes #1532
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/02bb8c37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/02bb8c37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/02bb8c37
Branch: refs/heads/master
Commit: 02bb8c375c48847b1686d70184fb194500a62e8c
Parents: 9ccf6db dce3a19
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Dec 7 12:51:09 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Dec 7 12:51:09 2016 +0100
----------------------------------------------------------------------
runners/spark/README.md | 59 +++++++-------------
.../beam/runners/spark/examples/WordCount.java | 5 +-
2 files changed, 21 insertions(+), 43 deletions(-)
----------------------------------------------------------------------