You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/14 10:53:11 UTC
[1/2] incubator-beam git commit: [BEAM-734] Add support for Spark
Streaming Listeners.
Repository: incubator-beam
Updated Branches:
refs/heads/master a0f649eac -> d790dfe1b
[BEAM-734] Add support for Spark Streaming Listeners.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b49abcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b49abcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b49abcf
Branch: refs/heads/master
Commit: 4b49abcf7d248e033b2bd8435dff031261f35b73
Parents: a0f649e
Author: Sela <an...@paypal.com>
Authored: Sun Oct 9 13:44:58 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Fri Oct 14 12:58:47 2016 +0300
----------------------------------------------------------------------
.../beam/runners/spark/SparkPipelineOptions.java | 18 ++++++++++++++++++
.../SparkRunnerStreamingContextFactory.java | 8 ++++++++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 7afb68c..4c20b10 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,6 +19,8 @@
package org.apache.beam.runners.spark;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -26,6 +28,8 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
/**
* Spark runner pipeline options.
@@ -88,4 +92,18 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
@JsonIgnore
JavaSparkContext getProvidedSparkContext();
void setProvidedSparkContext(JavaSparkContext jsc);
+
+ @Description("Spark streaming listeners")
+ @Default.InstanceFactory(EmptyListenersList.class)
+ @JsonIgnore
+ List<JavaStreamingListener> getListeners();
+ void setListeners(List<JavaStreamingListener> listeners);
+
+ /** Returns an empty list, top avoid handling null. */
+ static class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> {
+ @Override
+ public List<JavaStreamingListener> create(PipelineOptions options) {
+ return new ArrayList<>();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index b7a407c..79c87fb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -33,6 +33,8 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +91,12 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
}
jssc.checkpoint(checkpointDir);
+ // register listeners.
+ for (JavaStreamingListener listener: options.getListeners()) {
+ LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+ jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+ }
+
return jssc;
}
[2/2] incubator-beam git commit: This closes #1072
Posted by am...@apache.org.
This closes #1072
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d790dfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d790dfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d790dfe1
Branch: refs/heads/master
Commit: d790dfe1ba7da5387944e47389cd7b35061e2782
Parents: a0f649e 4b49abc
Author: Sela <an...@paypal.com>
Authored: Fri Oct 14 13:32:14 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Fri Oct 14 13:32:14 2016 +0300
----------------------------------------------------------------------
.../beam/runners/spark/SparkPipelineOptions.java | 18 ++++++++++++++++++
.../SparkRunnerStreamingContextFactory.java | 8 ++++++++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------