You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/23 07:05:20 UTC

spark git commit: [SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug

Repository: spark
Updated Branches:
  refs/heads/master 3c3fa632e -> e0f7fb7f9


[SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug

`reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible.

Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution?

Author: jerryshao <sa...@intel.com>

Closes #4104 from jerryshao/SPARK-5315 and squashes the following commits:

5bc8987 [jerryshao] Address the comment
c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible
8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0f7fb7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0f7fb7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0f7fb7f

Branch: refs/heads/master
Commit: e0f7fb7f9f497b34d42f9ba147197cf9ffc51607
Parents: 3c3fa63
Author: jerryshao <sa...@intel.com>
Authored: Thu Jan 22 22:04:21 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jan 22 22:04:21 2015 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  4 ++++
 .../streaming/api/java/JavaDStreamLike.scala    | 20 ++++++++++++++++++++
 .../apache/spark/streaming/JavaAPISuite.java    | 20 ++++++++++++++++++--
 3 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0f7fb7f/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 127973b..bc5d81f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -90,6 +90,10 @@ object MimaExcludes {
             // SPARK-5297 Java FileStream do not work with custom key/values
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
+          ) ++ Seq(
+            // SPARK-5315 Spark Streaming Java API returns Scala DStream
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
           )
 
         case v if v.startsWith("1.2") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e0f7fb7f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index e0542ed..c382a12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
+   * @deprecated As this API is not Java compatible.
    */
+  @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
   def reduceByWindow(
       reduceFunc: (T, T) => T,
       windowDuration: Duration,
@@ -222,6 +224,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD has a single element generated by reducing all
+   * elements in a sliding window over this DStream.
+   * @param reduceFunc associative reduce function
+   * @param windowDuration width of the window; must be a multiple of this DStream's
+   *                       batching interval
+   * @param slideDuration  sliding interval of the window (i.e., the interval after which
+   *                       the new DStream will generate RDDs); must be a multiple of this
+   *                       DStream's batching interval
+   */
+  def reduceByWindow(
+      reduceFunc: JFunction2[T, T, T],
+      windowDuration: Duration,
+      slideDuration: Duration
+    ): JavaDStream[T] = {
+    dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
+  }
+
+  /**
+   * Return a new DStream in which each RDD has a single element generated by reducing all
    * elements in a sliding window over this DStream. However, the reduction is done incrementally
    * using the old window's reduced value :
    *  1. reduce the new values that entered the window (e.g., adding new counts)

http://git-wip-us.apache.org/repos/asf/spark/blob/e0f7fb7f/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index d92e7fe..d4c4074 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -306,7 +306,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
   @SuppressWarnings("unchecked")
   @Test
-  public void testReduceByWindow() {
+  public void testReduceByWindowWithInverse() {
+    testReduceByWindow(true);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceByWindowWithoutInverse() {
+    testReduceByWindow(false);
+  }
+
+  private void testReduceByWindow(boolean withInverse) {
     List<List<Integer>> inputData = Arrays.asList(
         Arrays.asList(1,2,3),
         Arrays.asList(4,5,6),
@@ -319,8 +329,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         Arrays.asList(24));
 
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+    JavaDStream<Integer> reducedWindowed = null;
+    if (withInverse) {
+      reducedWindowed = stream.reduceByWindow(new IntegerSum(),
         new IntegerDifference(), new Duration(2000), new Duration(1000));
+    } else {
+      reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+        new Duration(2000), new Duration(1000));
+    }
     JavaTestUtils.attachTestOutputStream(reducedWindowed);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org