You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/14 10:53:11 UTC

[1/2] incubator-beam git commit: [BEAM-734] Add support for Spark Streaming Listeners.

Repository: incubator-beam
Updated Branches:
  refs/heads/master a0f649eac -> d790dfe1b


[BEAM-734] Add support for Spark Streaming Listeners.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b49abcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b49abcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b49abcf

Branch: refs/heads/master
Commit: 4b49abcf7d248e033b2bd8435dff031261f35b73
Parents: a0f649e
Author: Sela <an...@paypal.com>
Authored: Sun Oct 9 13:44:58 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Fri Oct 14 12:58:47 2016 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineOptions.java  | 18 ++++++++++++++++++
 .../SparkRunnerStreamingContextFactory.java       |  8 ++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 7afb68c..4c20b10 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,6 +19,8 @@
 package org.apache.beam.runners.spark;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -26,6 +28,8 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
 
 /**
  * Spark runner pipeline options.
@@ -88,4 +92,18 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   @JsonIgnore
   JavaSparkContext getProvidedSparkContext();
   void setProvidedSparkContext(JavaSparkContext jsc);
+
+  @Description("Spark streaming listeners")
+  @Default.InstanceFactory(EmptyListenersList.class)
+  @JsonIgnore
+  List<JavaStreamingListener> getListeners();
+  void setListeners(List<JavaStreamingListener> listeners);
+
+  /** Returns an empty list, top avoid handling null. */
+  static class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> {
+    @Override
+    public List<JavaStreamingListener> create(PipelineOptions options) {
+      return new ArrayList<>();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index b7a407c..79c87fb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -33,6 +33,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,6 +91,12 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     }
     jssc.checkpoint(checkpointDir);
 
+    // register listeners.
+    for (JavaStreamingListener listener: options.getListeners()) {
+      LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+      jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+    }
+
     return jssc;
   }
 


[2/2] incubator-beam git commit: This closes #1072

Posted by am...@apache.org.
This closes #1072


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d790dfe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d790dfe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d790dfe1

Branch: refs/heads/master
Commit: d790dfe1ba7da5387944e47389cd7b35061e2782
Parents: a0f649e 4b49abc
Author: Sela <an...@paypal.com>
Authored: Fri Oct 14 13:32:14 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Fri Oct 14 13:32:14 2016 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineOptions.java  | 18 ++++++++++++++++++
 .../SparkRunnerStreamingContextFactory.java       |  8 ++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------