You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/21 17:28:01 UTC

spark git commit: [SPARK-13248][STREAMING] Remove deprecated Streaming APIs.

Repository: spark
Updated Branches:
  refs/heads/master d9efe63ec -> 1a340da8d


[SPARK-13248][STREAMING] Remove deprecated Streaming APIs.

Remove deprecated Streaming APIs and adjust sample applications.

Author: Luciano Resende <lr...@apache.org>

Closes #11139 from lresende/streaming-deprecated-apis.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a340da8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a340da8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a340da8

Branch: refs/heads/master
Commit: 1a340da8d7590d831b040c74f5a6eb560e14d585
Parents: d9efe63
Author: Luciano Resende <lr...@apache.org>
Authored: Sun Feb 21 16:27:56 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Feb 21 16:27:56 2016 +0000

----------------------------------------------------------------------
 .../JavaRecoverableNetworkWordCount.java        | 18 ++--
 project/MimaExcludes.scala                      | 13 +++
 .../spark/streaming/StreamingContext.scala      | 38 --------
 .../streaming/api/java/JavaDStreamLike.scala    | 64 -------------
 .../api/java/JavaStreamingContext.scala         | 96 --------------------
 .../spark/streaming/dstream/DStream.scala       | 22 -----
 .../spark/streaming/scheduler/BatchInfo.scala   |  3 -
 .../spark/streaming/DStreamClosureSuite.scala   |  7 --
 8 files changed, 22 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bc8cbcd..f9929fc 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -26,17 +26,14 @@ import java.util.List;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
+
 import com.google.common.io.Files;
 
 import org.apache.spark.Accumulator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.api.java.function.*;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.Time;
@@ -44,7 +41,6 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
 
 /**
  * Use this singleton to get or register a Broadcast variable.
@@ -204,13 +200,17 @@ public final class JavaRecoverableNetworkWordCount {
     final int port = Integer.parseInt(args[1]);
     final String checkpointDirectory = args[2];
     final String outputPath = args[3];
-    JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
+
+    // Function to create JavaStreamingContext without any output operations
+    // (used to detect the new context)
+    Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
       @Override
-      public JavaStreamingContext create() {
+      public JavaStreamingContext call() {
         return createContext(ip, port, checkpointDirectory, outputPath);
       }
     };
-    JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
+
+    JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
     ssc.start();
     ssc.awaitTermination();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 65375a3..8f31a81 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -220,6 +220,19 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.streaming.zeromq.ZeroMQReceiver"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver$Supervisor")
       ) ++ Seq(
+        // SPARK-12348 Remove deprecated Streaming APIs.
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions$default$4"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.awaitTermination"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.networkStream"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.api.java.JavaStreamingContextFactory"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.awaitTermination"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.sc"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaDStreamLike.foreach"),
+        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate")
+      ) ++ Seq(
         // SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus")

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index a1b25c9..25e6157 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -271,20 +271,6 @@ class StreamingContext private[streaming] (
 
   /**
    * Create an input stream with any arbitrary user implemented receiver.
-   * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
-   * @param receiver Custom implementation of Receiver
-   *
-   * @deprecated As of 1.0.0 replaced by `receiverStream`.
-   */
-  @deprecated("Use receiverStream", "1.0.0")
-  def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
-    withNamedScope("network stream") {
-      receiverStream(receiver)
-    }
-  }
-
-  /**
-   * Create an input stream with any arbitrary user implemented receiver.
    * Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
    * @param receiver Custom implementation of Receiver
    */
@@ -624,18 +610,6 @@ class StreamingContext private[streaming] (
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
-   * @param timeout time to wait in milliseconds
-   *
-   * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`.
-   */
-  @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
-  def awaitTermination(timeout: Long) {
-    waiter.waitForStopOrError(timeout)
-  }
-
-  /**
-   * Wait for the execution to stop. Any exceptions that occurs during the execution
-   * will be thrown in this thread.
    *
    * @param timeout time to wait in milliseconds
    * @return `true` if it's stopped; or throw the reported error during the execution; or `false`
@@ -778,18 +752,6 @@ object StreamingContext extends Logging {
   }
 
   /**
-   * @deprecated As of 1.3.0, replaced by implicit functions in the DStream companion object.
-   *             This is kept here only for backward compatibility.
-   */
-  @deprecated("Replaced by implicit functions in the DStream companion object. This is " +
-    "kept here only for backward compatibility.", "1.3.0")
-  def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
-      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
-    : PairDStreamFunctions[K, V] = {
-    DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
-  }
-
-  /**
    * :: Experimental ::
    *
    * Either return the "active" StreamingContext (that is, started but not stopped), or create a

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 9931a46..65aab2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -220,26 +220,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @deprecated As this API is not Java compatible.
-   */
-  @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
-  def reduceByWindow(
-      reduceFunc: (T, T) => T,
-      windowDuration: Duration,
-      slideDuration: Duration
-    ): DStream[T] = {
-    dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
-  }
-
-  /**
-   * Return a new DStream in which each RDD has a single element generated by reducing all
-   * elements in a sliding window over this DStream.
-   * @param reduceFunc associative and commutative reduce function
-   * @param windowDuration width of the window; must be a multiple of this DStream's
-   *                       batching interval
-   * @param slideDuration  sliding interval of the window (i.e., the interval after which
-   *                       the new DStream will generate RDDs); must be a multiple of this
-   *                       DStream's batching interval
    */
   def reduceByWindow(
       reduceFunc: JFunction2[T, T, T],
@@ -284,50 +264,6 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * 'this' DStream will be registered as an output stream and therefore materialized.
-   *
-   * @deprecated  As of release 0.9.0, replaced by foreachRDD
-   */
-  @deprecated("Use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: JFunction[R, Void]) {
-    foreachRDD(foreachFunc)
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
-   *
-   * @deprecated  As of release 0.9.0, replaced by foreachRDD
-   */
-  @deprecated("Use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: JFunction2[R, Time, Void]) {
-    foreachRDD(foreachFunc)
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
-   *
-   * @deprecated  As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
-   */
-  @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
-  def foreachRDD(foreachFunc: JFunction[R, Void]) {
-    dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
-   *
-   * @deprecated  As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
-   */
-  @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
-  def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
-    dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreachRDD(foreachFunc: JVoidFunction[R]) {
     dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 7a25ce5..f8f1336 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -155,12 +155,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
   val sparkContext = new JavaSparkContext(ssc.sc)
 
   /**
-   * @deprecated As of 0.9.0, replaced by `sparkContext`
-   */
-  @deprecated("use sparkContext", "0.9.0")
-  val sc: JavaSparkContext = sparkContext
-
-  /**
    * Create an input stream from network source hostname:port. Data is received using
    * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
    * lines.
@@ -571,17 +565,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
-   * @param timeout time to wait in milliseconds
-   * @deprecated As of 1.3.0, replaced by `awaitTerminationOrTimeout(Long)`.
-   */
-  @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
-  def awaitTermination(timeout: Long): Unit = {
-    ssc.awaitTermination(timeout)
-  }
-
-  /**
-   * Wait for the execution to stop. Any exceptions that occurs during the execution
-   * will be thrown in this thread.
    *
    * @param timeout time to wait in milliseconds
    * @return `true` if it's stopped; or throw the reported error during the execution; or `false`
@@ -630,78 +613,6 @@ object JavaStreamingContext {
    * will be used to create a JavaStreamingContext.
    *
    * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
-   * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
-   * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
-   */
-  @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
-  def getOrCreate(
-      checkpointPath: String,
-      factory: JavaStreamingContextFactory
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
-      factory.create.ssc
-    })
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
-   * @param hadoopConf     Hadoop configuration if necessary for reading from any HDFS compatible
-   *                       file system
-   * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
-   */
-  @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
-  def getOrCreate(
-      checkpointPath: String,
-      hadoopConf: Configuration,
-      factory: JavaStreamingContextFactory
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
-      factory.create.ssc
-    }, hadoopConf)
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
-   * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
-   * @param hadoopConf     Hadoop configuration if necessary for reading from any HDFS compatible
-   *                       file system
-   * @param createOnError  Whether to create a new JavaStreamingContext if there is an
-   *                       error in reading checkpoint data.
-   * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory.
-   */
-  @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
-  def getOrCreate(
-      checkpointPath: String,
-      hadoopConf: Configuration,
-      factory: JavaStreamingContextFactory,
-      createOnError: Boolean
-    ): JavaStreamingContext = {
-    val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
-      factory.create.ssc
-    }, hadoopConf, createOnError)
-    new JavaStreamingContext(ssc)
-  }
-
-  /**
-   * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
-   * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
-   * recreated from the checkpoint data. If the data does not exist, then the provided factory
-   * will be used to create a JavaStreamingContext.
-   *
-   * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
    * @param creatingFunc   Function to create a new JavaStreamingContext
    */
   def getOrCreate(
@@ -767,10 +678,3 @@ object JavaStreamingContext {
    */
   def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
 }
-
-/**
- * Factory interface for creating a new JavaStreamingContext
- */
-trait JavaStreamingContextFactory {
-  def create(): JavaStreamingContext
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 70e1d8a..102a030 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -621,28 +621,6 @@ abstract class DStream[T: ClassTag] (
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * 'this' DStream will be registered as an output stream and therefore materialized.
-   *
-   * @deprecated As of 0.9.0, replaced by `foreachRDD`.
-   */
-  @deprecated("use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
-    this.foreachRDD(foreachFunc)
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
-   *
-   * @deprecated As of 0.9.0, replaced by `foreachRDD`.
-   */
-  @deprecated("use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
-    this.foreachRDD(foreachFunc)
-  }
-
-  /**
-   * Apply a function to each RDD in this DStream. This is an output operator, so
-   * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     val cleanedF = context.sparkContext.clean(foreachFunc, false)

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 436eb0a..5b2b959 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -41,9 +41,6 @@ case class BatchInfo(
     outputOperationInfos: Map[Int, OutputOperationInfo]
   ) {
 
-  @deprecated("Use streamIdToInputInfo instead", "1.5.0")
-  def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
-
   /**
    * Time taken for the first job of this batch to start processing from the time this batch
    * was submitted to the streaming scheduler. Essentially, it is

http://git-wip-us.apache.org/repos/asf/spark/blob/1a340da8/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
index e897de3..1fc34f5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamClosureSuite.scala
@@ -56,7 +56,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
     testFilter(dstream)
     testMapPartitions(dstream)
     testReduce(dstream)
-    testForeach(dstream)
     testForeachRDD(dstream)
     testTransform(dstream)
     testTransformWith(dstream)
@@ -106,12 +105,6 @@ class DStreamClosureSuite extends SparkFunSuite with BeforeAndAfterAll {
   private def testReduce(ds: DStream[Int]): Unit = expectCorrectException {
     ds.reduce { case (_, _) => return; 1 }
   }
-  private def testForeach(ds: DStream[Int]): Unit = {
-    val foreachF1 = (rdd: RDD[Int], t: Time) => return
-    val foreachF2 = (rdd: RDD[Int]) => return
-    expectCorrectException { ds.foreach(foreachF1) }
-    expectCorrectException { ds.foreach(foreachF2) }
-  }
   private def testForeachRDD(ds: DStream[Int]): Unit = {
     val foreachRDDF1 = (rdd: RDD[Int], t: Time) => return
     val foreachRDDF2 = (rdd: RDD[Int]) => return


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org