You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 21:18:24 UTC

[7/9] git commit: Merge remote-tracking branch 'apache/master' into dstream-move

Merge remote-tracking branch 'apache/master' into dstream-move

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/777c181d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/777c181d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/777c181d

Branch: refs/heads/master
Commit: 777c181d2f583570956724f9cbe20eb1dc7048f1
Parents: 034f89a 405bfe8
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 21:59:51 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 21:59:51 2014 -0800

----------------------------------------------------------------------
 docs/streaming-programming-guide.md             |  4 +--
 .../streaming/examples/RawNetworkGrep.scala     |  2 +-
 .../examples/RecoverableNetworkWordCount.scala  |  2 +-
 .../streaming/examples/TwitterAlgebirdCMS.scala |  4 +--
 .../streaming/examples/TwitterAlgebirdHLL.scala |  4 +--
 .../streaming/examples/TwitterPopularTags.scala |  4 +--
 .../examples/clickstream/PageViewStream.scala   |  2 +-
 .../streaming/api/java/JavaDStreamLike.scala    | 26 ++++++++++++++++--
 .../spark/streaming/dstream/DStream.scala       | 29 +++++++++++++++-----
 .../dstream/PairDStreamFunctions.scala          |  4 +--
 .../spark/streaming/BasicOperationsSuite.scala  |  2 +-
 .../spark/streaming/StreamingContextSuite.scala |  2 +-
 12 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/docs/streaming-programming-guide.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --cc examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9,d51e6e9..8c5d0bd
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@@ -82,7 -82,7 +82,7 @@@ object RecoverableNetworkWordCount 
      val lines = ssc.socketTextStream(ip, port)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
--    wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
++    wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
        val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
        println(counts)
        println("Appending to " + outputFile.getAbsolutePath)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 8014db6,0000000..a7c4cca
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@@ -1,742 -1,0 +1,757 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.streaming.dstream
 +
++
++import scala.deprecated
 +import scala.collection.mutable.HashMap
 +import scala.reflect.ClassTag
 +
- import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
++import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 +
 +import org.apache.spark.Logging
 +import org.apache.spark.rdd.RDD
 +import org.apache.spark.storage.StorageLevel
 +import org.apache.spark.util.MetadataCleaner
 +import org.apache.spark.streaming._
 +import org.apache.spark.streaming.StreamingContext._
 +import org.apache.spark.streaming.scheduler.Job
 +import org.apache.spark.streaming.Duration
 +
- 
 +/**
 + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
 + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
 + * for more details on RDDs). DStreams can either be created from live data (such as, data from
 + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
 + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
 + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
 + * by a parent DStream.
 + *
 + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
 + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
 + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
 + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
 + * implicit conversions when `spark.streaming.StreamingContext._` is imported.
 + *
 + * DStreams internally is characterized by a few basic properties:
 + *  - A list of other DStreams that the DStream depends on
 + *  - A time interval at which the DStream generates an RDD
 + *  - A function that is used to generate an RDD after each time interval
 + */
 +
 +abstract class DStream[T: ClassTag] (
 +    @transient private[streaming] var ssc: StreamingContext
 +  ) extends Serializable with Logging {
 +
 +  // =======================================================================
 +  // Methods that should be implemented by subclasses of DStream
 +  // =======================================================================
 +
 +  /** Time interval after which the DStream generates a RDD */
 +  def slideDuration: Duration
 +
 +  /** List of parent DStreams on which this DStream depends on */
 +  def dependencies: List[DStream[_]]
 +
 +  /** Method that generates a RDD for the given time */
 +  def compute (validTime: Time): Option[RDD[T]]
 +
 +  // =======================================================================
 +  // Methods and fields available on all DStreams
 +  // =======================================================================
 +
 +  // RDDs generated, marked as private[streaming] so that testsuites can access it
 +  @transient
 +  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
 +
 +  // Time zero for the DStream
 +  private[streaming] var zeroTime: Time = null
 +
 +  // Duration for which the DStream will remember each RDD created
 +  private[streaming] var rememberDuration: Duration = null
 +
 +  // Storage level of the RDDs in the stream
 +  private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
 +
 +  // Checkpoint details
 +  private[streaming] val mustCheckpoint = false
 +  private[streaming] var checkpointDuration: Duration = null
 +  private[streaming] val checkpointData = new DStreamCheckpointData(this)
 +
 +  // Reference to whole DStream graph
 +  private[streaming] var graph: DStreamGraph = null
 +
 +  private[streaming] def isInitialized = (zeroTime != null)
 +
 +  // Duration for which the DStream requires its parent DStream to remember each RDD created
 +  private[streaming] def parentRememberDuration = rememberDuration
 +
 +  /** Return the StreamingContext associated with this DStream */
 +  def context = ssc
 +
 +  /** Persist the RDDs of this DStream with the given storage level */
 +  def persist(level: StorageLevel): DStream[T] = {
 +    if (this.isInitialized) {
 +      throw new UnsupportedOperationException(
 +        "Cannot change storage level of an DStream after streaming context has started")
 +    }
 +    this.storageLevel = level
 +    this
 +  }
 +
 +  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
 +  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
 +
 +  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
 +  def cache(): DStream[T] = persist()
 +
 +  /**
 +   * Enable periodic checkpointing of RDDs of this DStream
 +   * @param interval Time interval after which generated RDD will be checkpointed
 +   */
 +  def checkpoint(interval: Duration): DStream[T] = {
 +    if (isInitialized) {
 +      throw new UnsupportedOperationException(
 +        "Cannot change checkpoint interval of an DStream after streaming context has started")
 +    }
 +    persist()
 +    checkpointDuration = interval
 +    this
 +  }
 +
 +  /**
 +   * Initialize the DStream by setting the "zero" time, based on which
 +   * the validity of future times is calculated. This method also recursively initializes
 +   * its parent DStreams.
 +   */
 +  private[streaming] def initialize(time: Time) {
 +    if (zeroTime != null && zeroTime != time) {
 +      throw new Exception("ZeroTime is already initialized to " + zeroTime
 +        + ", cannot initialize it again to " + time)
 +    }
 +    zeroTime = time
 +
 +    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
 +    if (mustCheckpoint && checkpointDuration == null) {
 +      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
 +      logInfo("Checkpoint interval automatically set to " + checkpointDuration)
 +    }
 +
 +    // Set the minimum value of the rememberDuration if not already set
 +    var minRememberDuration = slideDuration
 +    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
 +      minRememberDuration = checkpointDuration * 2  // times 2 just to be sure that the latest checkpoint is not forgetten
 +    }
 +    if (rememberDuration == null || rememberDuration < minRememberDuration) {
 +      rememberDuration = minRememberDuration
 +    }
 +
 +    // Initialize the dependencies
 +    dependencies.foreach(_.initialize(zeroTime))
 +  }
 +
 +  private[streaming] def validate() {
 +    assert(rememberDuration != null, "Remember duration is set to null")
 +
 +    assert(
 +      !mustCheckpoint || checkpointDuration != null,
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
 +        " Please use DStream.checkpoint() to set the interval."
 +    )
 +
 +    assert(
 +     checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
 +      "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
 +      " or SparkContext.checkpoint() to set the checkpoint directory."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || checkpointDuration >= slideDuration,
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
 +        checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
 +        "Please set it to at least " + slideDuration + "."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
 +      "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
 +        checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
 +        "Please set it to a multiple " + slideDuration + "."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || storageLevel != StorageLevel.NONE,
 +      "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
 +        "level has not been set to enable persisting. Please use DStream.persist() to set the " +
 +        "storage level to use memory for better checkpointing performance."
 +    )
 +
 +    assert(
 +      checkpointDuration == null || rememberDuration > checkpointDuration,
 +      "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
 +        rememberDuration + " which is not more than the checkpoint interval (" +
 +        checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
 +    )
 +
 +    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
 +    logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
 +    assert(
 +      metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
 +      "It seems you are doing some DStream window operation or setting a checkpoint interval " +
 +        "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
 +        "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
 +        "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
 +        "set the Java property 'spark.cleaner.delay' to more than " +
 +        math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
 +    )
 +
 +    dependencies.foreach(_.validate())
 +
 +    logInfo("Slide time = " + slideDuration)
 +    logInfo("Storage level = " + storageLevel)
 +    logInfo("Checkpoint interval = " + checkpointDuration)
 +    logInfo("Remember duration = " + rememberDuration)
 +    logInfo("Initialized and validated " + this)
 +  }
 +
 +  private[streaming] def setContext(s: StreamingContext) {
 +    if (ssc != null && ssc != s) {
 +      throw new Exception("Context is already set in " + this + ", cannot set it again")
 +    }
 +    ssc = s
 +    logInfo("Set context for " + this)
 +    dependencies.foreach(_.setContext(ssc))
 +  }
 +
 +  private[streaming] def setGraph(g: DStreamGraph) {
 +    if (graph != null && graph != g) {
 +      throw new Exception("Graph is already set in " + this + ", cannot set it again")
 +    }
 +    graph = g
 +    dependencies.foreach(_.setGraph(graph))
 +  }
 +
 +  private[streaming] def remember(duration: Duration) {
 +    if (duration != null && duration > rememberDuration) {
 +      rememberDuration = duration
 +      logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
 +    }
 +    dependencies.foreach(_.remember(parentRememberDuration))
 +  }
 +
 +  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
 +  private[streaming] def isTimeValid(time: Time): Boolean = {
 +    if (!isInitialized) {
 +      throw new Exception (this + " has not been initialized")
 +    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
 +      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
 +      false
 +    } else {
 +      logDebug("Time " + time + " is valid")
 +      true
 +    }
 +  }
 +
 +  /**
 +   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
 +   * method that should not be called directly.
 +   */
 +  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
 +    // If this DStream was not initialized (i.e., zeroTime not set), then do it
 +    // If RDD was already generated, then retrieve it from HashMap
 +    generatedRDDs.get(time) match {
 +
 +      // If an RDD was already generated and is being reused, then
 +      // probably all RDDs in this DStream will be reused and hence should be cached
 +      case Some(oldRDD) => Some(oldRDD)
 +
 +      // if RDD was not generated, and if the time is valid
 +      // (based on sliding time of this DStream), then generate the RDD
 +      case None => {
 +        if (isTimeValid(time)) {
 +          compute(time) match {
 +            case Some(newRDD) =>
 +              if (storageLevel != StorageLevel.NONE) {
 +                newRDD.persist(storageLevel)
 +                logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
 +              }
 +              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
 +                newRDD.checkpoint()
 +                logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
 +              }
 +              generatedRDDs.put(time, newRDD)
 +              Some(newRDD)
 +            case None =>
 +              None
 +          }
 +        } else {
 +          None
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Generate a SparkStreaming job for the given time. This is an internal method that
 +   * should not be called directly. This default implementation creates a job
 +   * that materializes the corresponding RDD. Subclasses of DStream may override this
 +   * to generate their own jobs.
 +   */
 +  private[streaming] def generateJob(time: Time): Option[Job] = {
 +    getOrCompute(time) match {
 +      case Some(rdd) => {
 +        val jobFunc = () => {
 +          val emptyFunc = { (iterator: Iterator[T]) => {} }
 +          context.sparkContext.runJob(rdd, emptyFunc)
 +        }
 +        Some(new Job(time, jobFunc))
 +      }
 +      case None => None
 +    }
 +  }
 +
 +  /**
 +   * Clear metadata that are older than `rememberDuration` of this DStream.
 +   * This is an internal method that should not be called directly. This default
 +   * implementation clears the old generated RDDs. Subclasses of DStream may override
 +   * this to clear their own metadata along with the generated RDDs.
 +   */
 +  private[streaming] def clearMetadata(time: Time) {
 +    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
 +    generatedRDDs --= oldRDDs.keys
 +    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
 +      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
 +    dependencies.foreach(_.clearMetadata(time))
 +  }
 +
 +  /* Adds metadata to the Stream while it is running.
 +   * This method should be overwritten by sublcasses of InputDStream.
 +   */
 +  private[streaming] def addMetadata(metadata: Any) {
 +    if (metadata != null) {
 +      logInfo("Dropping Metadata: " + metadata.toString)
 +    }
 +  }
 +
 +  /**
 +   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
 +   * this stream. This is an internal method that should not be called directly. This is
 +   * a default implementation that saves only the file names of the checkpointed RDDs to
 +   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
 +   * this method to save custom checkpoint data.
 +   */
 +  private[streaming] def updateCheckpointData(currentTime: Time) {
 +    logDebug("Updating checkpoint data for time " + currentTime)
 +    checkpointData.update(currentTime)
 +    dependencies.foreach(_.updateCheckpointData(currentTime))
 +    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
 +  }
 +
 +  private[streaming] def clearCheckpointData(time: Time) {
 +    logDebug("Clearing checkpoint data")
 +    checkpointData.cleanup(time)
 +    dependencies.foreach(_.clearCheckpointData(time))
 +    logDebug("Cleared checkpoint data")
 +  }
 +
 +  /**
 +   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
 +   * that should not be called directly. This is a default implementation that recreates RDDs
 +   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
 +   * override the updateCheckpointData() method would also need to override this method.
 +   */
 +  private[streaming] def restoreCheckpointData() {
 +    // Create RDDs from the checkpoint data
 +    logInfo("Restoring checkpoint data")
 +    checkpointData.restore()
 +    dependencies.foreach(_.restoreCheckpointData())
 +    logInfo("Restored checkpoint data")
 +  }
 +
 +  @throws(classOf[IOException])
 +  private def writeObject(oos: ObjectOutputStream) {
 +    logDebug(this.getClass().getSimpleName + ".writeObject used")
 +    if (graph != null) {
 +      graph.synchronized {
 +        if (graph.checkpointInProgress) {
 +          oos.defaultWriteObject()
 +        } else {
 +          val msg = "Object of " + this.getClass.getName + " is being serialized " +
 +            " possibly as a part of closure of an RDD operation. This is because " +
 +            " the DStream object is being referred to from within the closure. " +
 +            " Please rewrite the RDD operation inside this DStream to avoid this. " +
 +            " This has been enforced to avoid bloating of Spark tasks " +
 +            " with unnecessary objects."
 +          throw new java.io.NotSerializableException(msg)
 +        }
 +      }
 +    } else {
 +      throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
 +    }
 +  }
 +
 +  @throws(classOf[IOException])
 +  private def readObject(ois: ObjectInputStream) {
 +    logDebug(this.getClass().getSimpleName + ".readObject used")
 +    ois.defaultReadObject()
 +    generatedRDDs = new HashMap[Time, RDD[T]] ()
 +  }
 +
 +  // =======================================================================
 +  // DStream operations
 +  // =======================================================================
 +
 +  /** Return a new DStream by applying a function to all elements of this DStream. */
 +  def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
 +    new MappedDStream(this, context.sparkContext.clean(mapFunc))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a function to all elements of this DStream,
 +   * and then flattening the results
 +   */
 +  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
 +    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
 +  }
 +
 +  /** Return a new DStream containing only the elements that satisfy a predicate. */
 +  def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
 +   * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
 +   * an array.
 +   */
 +  def glom(): DStream[Array[T]] = new GlommedDStream(this)
 +
 +
 +  /**
 +   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
 +   * returned DStream has exactly numPartitions partitions.
 +   */
 +  def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
 +   * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
 +   * of the RDD.
 +   */
 +  def mapPartitions[U: ClassTag](
 +      mapPartFunc: Iterator[T] => Iterator[U],
 +      preservePartitioning: Boolean = false
 +    ): DStream[U] = {
 +    new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
 +   * of this DStream.
 +   */
 +  def reduce(reduceFunc: (T, T) => T): DStream[T] =
 +    this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by counting each RDD
 +   * of this DStream.
 +   */
 +  def count(): DStream[Long] = {
 +    this.map(_ => (null, 1L))
 +        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
 +        .reduceByKey(_ + _)
 +        .map(_._2)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains the counts of each distinct value in
 +   * each RDD of this DStream. Hash partitioning is used to generate
 +   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
 +   * `numPartitions` not specified).
 +   */
 +  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
 +    this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
 +
 +  /**
 +   * Apply a function to each RDD in this DStream. This is an output operator, so
 +   * 'this' DStream will be registered as an output stream and therefore materialized.
 +   */
-   def foreach(foreachFunc: RDD[T] => Unit) {
-     this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
++  @deprecated("use foreachRDD", "0.9.0")
++  def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
++
++  /**
++   * Apply a function to each RDD in this DStream. This is an output operator, so
++   * 'this' DStream will be registered as an output stream and therefore materialized.
++   */
++  @deprecated("use foreachRDD", "0.9.0")
++  def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
++
++  /**
++   * Apply a function to each RDD in this DStream. This is an output operator, so
++   * 'this' DStream will be registered as an output stream and therefore materialized.
++   */
++  def foreachRDD(foreachFunc: RDD[T] => Unit) {
++    this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
 +  }
 +
 +  /**
 +   * Apply a function to each RDD in this DStream. This is an output operator, so
 +   * 'this' DStream will be registered as an output stream and therefore materialized.
 +   */
-   def foreach(foreachFunc: (RDD[T], Time) => Unit) {
++  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
 +    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream.
 +   */
 +  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
 +    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream.
 +   */
 +  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
 +    //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
 +    val cleanedF = context.sparkContext.clean(transformFunc)
 +    val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
 +      assert(rdds.length == 1)
 +      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
 +    }
 +    new TransformedDStream[U](Seq(this), realTransformFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream and 'other' DStream.
 +   */
 +  def transformWith[U: ClassTag, V: ClassTag](
 +      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
 +    ): DStream[V] = {
 +    val cleanedF = ssc.sparkContext.clean(transformFunc)
 +    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD is generated by applying a function
 +   * on each RDD of 'this' DStream and 'other' DStream.
 +   */
 +  def transformWith[U: ClassTag, V: ClassTag](
 +      other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
 +    ): DStream[V] = {
 +    val cleanedF = ssc.sparkContext.clean(transformFunc)
 +    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
 +      assert(rdds.length == 2)
 +      val rdd1 = rdds(0).asInstanceOf[RDD[T]]
 +      val rdd2 = rdds(1).asInstanceOf[RDD[U]]
 +      cleanedF(rdd1, rdd2, time)
 +    }
 +    new TransformedDStream[V](Seq(this, other), realTransformFunc)
 +  }
 +
 +  /**
 +   * Print the first ten elements of each RDD generated in this DStream. This is an output
 +   * operator, so this DStream will be registered as an output stream and there materialized.
 +   */
 +  def print() {
 +    def foreachFunc = (rdd: RDD[T], time: Time) => {
 +      val first11 = rdd.take(11)
 +      println ("-------------------------------------------")
 +      println ("Time: " + time)
 +      println ("-------------------------------------------")
 +      first11.take(10).foreach(println)
 +      if (first11.size > 10) println("...")
 +      println()
 +    }
 +    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
 +    ssc.registerOutputStream(newStream)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains all the elements in seen in a
 +   * sliding window of time over this DStream. The new DStream generates RDDs with
 +   * the same interval as this DStream.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
 +   */
 +  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
 +
 +  /**
 +   * Return a new DStream in which each RDD contains all the elements in seen in a
 +   * sliding window of time over this DStream.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
 +    new WindowedDStream(this, windowDuration, slideDuration)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing all
 +   * elements in a sliding window over this DStream.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByWindow(
 +      reduceFunc: (T, T) => T,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[T] = {
 +    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by reducing all
 +   * elements in a sliding window over this DStream. However, the reduction is done incrementally
 +   * using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   *  This is more efficient than reduceByWindow without "inverse reduce" function.
 +   *  However, it is applicable to only "invertible reduce functions".
 +   * @param reduceFunc associative reduce function
 +   * @param invReduceFunc inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByWindow(
 +      reduceFunc: (T, T) => T,
 +      invReduceFunc: (T, T) => T,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[T] = {
 +      this.map(x => (1, x))
 +          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
 +          .map(_._2)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD has a single element generated by counting the number
 +   * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
 +   * Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
 +    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
 +  }
 +
 +  /**
 +   * Return a new DStream in which each RDD contains the count of distinct elements in
 +   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
 +   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
 +   * `numPartitions` not specified).
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream.
 +   */
 +  def countByValueAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int = ssc.sc.defaultParallelism
 +    ): DStream[(T, Long)] = {
 +
 +    this.map(x => (x, 1L)).reduceByKeyAndWindow(
 +      (x: Long, y: Long) => x + y,
 +      (x: Long, y: Long) => x - y,
 +      windowDuration,
 +      slideDuration,
 +      numPartitions,
 +      (x: (T, Long)) => x._2 != 0L
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by unifying data of another DStream with this DStream.
 +   * @param that Another DStream having the same slideDuration as this DStream.
 +   */
 +  def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
 +
 +  /**
 +   * Return all the RDDs defined by the Interval object (both end times included)
 +   */
 +  def slice(interval: Interval): Seq[RDD[T]] = {
 +    slice(interval.beginTime, interval.endTime)
 +  }
 +
 +  /**
 +   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
 +   */
 +  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
 +    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
 +      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
 +    }
 +    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
 +      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
 +    }
 +    val alignedToTime = toTime.floor(slideDuration)
 +    val alignedFromTime = fromTime.floor(slideDuration)
 +
 +    logInfo("Slicing from " + fromTime + " to " + toTime +
 +      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
 +
 +    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
 +      if (time >= zeroTime) getOrCompute(time) else None
 +    })
 +  }
 +
 +  /**
 +   * Save each RDD in this DStream as a Sequence file of serialized objects.
 +   * The file name at each batch interval is generated based on `prefix` and
 +   * `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsObjectFiles(prefix: String, suffix: String = "") {
 +    val saveFunc = (rdd: RDD[T], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsObjectFile(file)
 +    }
-     this.foreach(saveFunc)
++    this.foreachRDD(saveFunc)
 +  }
 +
 +  /**
 +   * Save each RDD in this DStream as at text file, using string representation
 +   * of elements. The file name at each batch interval is generated based on
 +   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsTextFiles(prefix: String, suffix: String = "") {
 +    val saveFunc = (rdd: RDD[T], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsTextFile(file)
 +    }
-     this.foreach(saveFunc)
++    this.foreachRDD(saveFunc)
 +  }
 +
 +  def register() {
 +    ssc.registerOutputStream(this)
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f71dd17,0000000..6b3e483
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@@ -1,622 -1,0 +1,622 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.streaming.dstream
 +
 +import org.apache.spark.streaming.StreamingContext._
 +import org.apache.spark.streaming.dstream._
 +
 +import org.apache.spark.{Partitioner, HashPartitioner}
 +import org.apache.spark.SparkContext._
 +import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
 +import org.apache.spark.storage.StorageLevel
 +
 +import scala.collection.mutable.ArrayBuffer
 +import scala.reflect.{ClassTag, classTag}
 +
 +import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 +import org.apache.hadoop.mapred.OutputFormat
 +import org.apache.hadoop.security.UserGroupInformation
 +import org.apache.hadoop.conf.Configuration
 +import org.apache.spark.streaming.{Time, Duration}
 +
 +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 +extends Serializable {
 +
 +  private[streaming] def ssc = self.ssc
 +
 +  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
 +    new HashPartitioner(numPartitions)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
 +   * generate the RDDs with Spark's default number of partitions.
 +   */
 +  def groupByKey(): DStream[(K, Seq[V])] = {
 +    groupByKey(defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
 +   * generate the RDDs with `numPartitions` partitions.
 +   */
 +  def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
 +    groupByKey(defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
 +   * is used to control the partitioning of each RDD.
 +   */
 +  def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
 +    val createCombiner = (v: V) => ArrayBuffer[V](v)
 +    val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
 +    val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
 +    combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
 +      .asInstanceOf[DStream[(K, Seq[V])]]
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 +   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
 +   * with Spark's default number of partitions.
 +   */
 +  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
 +    reduceByKey(reduceFunc, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 +   * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
 +   * with `numPartitions` partitions.
 +   */
 +  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
 +    reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
 +   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
 +   * partitioning of each RDD.
 +   */
 +  def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
 +    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
 +    combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
 +  }
 +
 +  /**
 +   * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
 +   * combineByKey for RDDs. Please refer to combineByKey in
 +   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
 +   */
 +  def combineByKey[C: ClassTag](
 +    createCombiner: V => C,
 +    mergeValue: (C, V) => C,
 +    mergeCombiner: (C, C) => C,
 +    partitioner: Partitioner,
 +    mapSideCombine: Boolean = true): DStream[(K, C)] = {
 +    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
 +   * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
 +   * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
 +   * Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   */
 +  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
 +    groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
 +   * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
 +   * generate the RDDs with Spark's default number of partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
 +    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
 +   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream; if not specified
 +   *                       then Spark's default number of partitions will be used
 +   */
 +  def groupByKeyAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int
 +    ): DStream[(K, Seq[V])] = {
 +    groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
 +   * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
 +   */
 +  def groupByKeyAndWindow(
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      partitioner: Partitioner
 +    ): DStream[(K, Seq[V])] = {
 +    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
 +    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
 +    val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
 +    self.groupByKey(partitioner)
 +        .window(windowDuration, slideDuration)
 +        .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
 +        .asInstanceOf[DStream[(K, Seq[V])]]
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
 +   * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
 +   * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
 +   * the RDDs with Spark's default number of partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration
 +    ): DStream[(K, V)] = {
 +    reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
 +   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
 +   * generate the RDDs with Spark's default number of partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration
 +    ): DStream[(K, V)] = {
 +    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
 +   * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
 +   * generate the RDDs with `numPartitions` partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param numPartitions  number of partitions of each RDD in the new DStream.
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      numPartitions: Int
 +    ): DStream[(K, V)] = {
 +    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
 +   * `DStream.reduceByKey()`, but applies it over a sliding window.
 +   * @param reduceFunc associative reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param partitioner    partitioner for controlling the partitioning of each RDD
 +   *                       in the new DStream.
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      partitioner: Partitioner
 +    ): DStream[(K, V)] = {
 +    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
 +    self.reduceByKey(cleanedReduceFunc, partitioner)
 +        .window(windowDuration, slideDuration)
 +        .reduceByKey(cleanedReduceFunc, partitioner)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
 +   * The reduced value of over a new window is calculated using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   *
 +   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
 +   * However, it is applicable to only "invertible reduce functions".
 +   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 +   * @param reduceFunc associative reduce function
 +   * @param invReduceFunc inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param filterFunc     Optional function to filter expired key-value pairs;
 +   *                       only pairs that satisfy the function are retained
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      invReduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration = self.slideDuration,
 +      numPartitions: Int = ssc.sc.defaultParallelism,
 +      filterFunc: ((K, V)) => Boolean = null
 +    ): DStream[(K, V)] = {
 +
 +    reduceByKeyAndWindow(
 +      reduceFunc, invReduceFunc, windowDuration,
 +      slideDuration, defaultPartitioner(numPartitions), filterFunc
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
 +   * The reduced value of over a new window is calculated using the old window's reduced value :
 +   *  1. reduce the new values that entered the window (e.g., adding new counts)
 +   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 +   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
 +   * However, it is applicable to only "invertible reduce functions".
 +   * @param reduceFunc     associative reduce function
 +   * @param invReduceFunc  inverse reduce function
 +   * @param windowDuration width of the window; must be a multiple of this DStream's
 +   *                       batching interval
 +   * @param slideDuration  sliding interval of the window (i.e., the interval after which
 +   *                       the new DStream will generate RDDs); must be a multiple of this
 +   *                       DStream's batching interval
 +   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
 +   * @param filterFunc     Optional function to filter expired key-value pairs;
 +   *                       only pairs that satisfy the function are retained
 +   */
 +  def reduceByKeyAndWindow(
 +      reduceFunc: (V, V) => V,
 +      invReduceFunc: (V, V) => V,
 +      windowDuration: Duration,
 +      slideDuration: Duration,
 +      partitioner: Partitioner,
 +      filterFunc: ((K, V)) => Boolean
 +    ): DStream[(K, V)] = {
 +
 +    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
 +    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
 +    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
 +    new ReducedWindowedDStream[K, V](
 +      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
 +      windowDuration, slideDuration, partitioner
 +    )
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of each key.
 +   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Seq[V], Option[S]) => Option[S]
 +    ): DStream[(K, S)] = {
 +    updateStateByKey(updateFunc, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of each key.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated.
 +   * @param numPartitions Number of partitions of each RDD in the new DStream.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Seq[V], Option[S]) => Option[S],
 +      numPartitions: Int
 +    ): DStream[(K, S)] = {
 +    updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of the key.
 +   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated.
 +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Seq[V], Option[S]) => Option[S],
 +      partitioner: Partitioner
 +    ): DStream[(K, S)] = {
 +    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
 +      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
 +    }
 +    updateStateByKey(newUpdateFunc, partitioner, true)
 +  }
 +
 +  /**
 +   * Return a new "state" DStream where the state for each key is updated by applying
 +   * the given function on the previous state of the key and the new values of each key.
 +   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
 +   * @param updateFunc State update function. If `this` function returns None, then
 +   *                   corresponding state key-value pair will be eliminated. Note, that
 +   *                   this function may generate a different a tuple with a different key
 +   *                   than the input key. It is up to the developer to decide whether to
 +   *                   remember the partitioner despite the key being changed.
 +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
 +   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
 +   * @tparam S State type
 +   */
 +  def updateStateByKey[S: ClassTag](
 +      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
 +      partitioner: Partitioner,
 +      rememberPartitioner: Boolean
 +    ): DStream[(K, S)] = {
 +     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a map function to the value of each key-value pairs in
 +   * 'this' DStream without changing the key.
 +   */
 +  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
 +    new MapValuedDStream[K, V, U](self, mapValuesFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
 +   * 'this' DStream without changing the key.
 +   */
 +  def flatMapValues[U: ClassTag](
 +      flatMapValuesFunc: V => TraversableOnce[U]
 +    ): DStream[(K, U)] = {
 +    new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with Spark's default number
 +   * of partitions.
 +   */
 +  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
 +    cogroup(other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   */
 +  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
 +    cogroup(other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
 +   * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
 +   */
 +  def cogroup[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (Seq[V], Seq[W]))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 +   */
 +  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
 +    join[W](other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
 +   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
 +   */
 +  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
 +    join[W](other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
 +   * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
 +   */
 +  def join[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (V, W))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
 +   * number of partitions.
 +   */
 +  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
 +    leftOuterJoin[W](other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
 +   * partitions.
 +   */
 +  def leftOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      numPartitions: Int
 +    ): DStream[(K, (V, Option[W]))] = {
 +    leftOuterJoin[W](other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
 +   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
 +   * the partitioning of each RDD.
 +   */
 +  def leftOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (V, Option[W]))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
 +   * number of partitions.
 +   */
 +  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
 +    rightOuterJoin[W](other, defaultPartitioner())
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
 +   * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
 +   * partitions.
 +   */
 +  def rightOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      numPartitions: Int
 +    ): DStream[(K, (Option[V], W))] = {
 +    rightOuterJoin[W](other, defaultPartitioner(numPartitions))
 +  }
 +
 +  /**
 +   * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
 +   * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
 +   * the partitioning of each RDD.
 +   */
 +  def rightOuterJoin[W: ClassTag](
 +      other: DStream[(K, W)],
 +      partitioner: Partitioner
 +    ): DStream[(K, (Option[V], W))] = {
 +    self.transformWith(
 +      other,
 +      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
 +    )
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
 +   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
 +   */
 +  def saveAsHadoopFiles[F <: OutputFormat[K, V]](
 +      prefix: String,
 +      suffix: String
 +    )(implicit fm: ClassTag[F]) {
 +    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
 +   * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
 +   */
 +  def saveAsHadoopFiles(
 +      prefix: String,
 +      suffix: String,
 +      keyClass: Class[_],
 +      valueClass: Class[_],
 +      outputFormatClass: Class[_ <: OutputFormat[_, _]],
 +      conf: JobConf = new JobConf
 +    ) {  
 +    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
 +    }
-     self.foreach(saveFunc)
++    self.foreachRDD(saveFunc)
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
 +   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
 +      prefix: String,
 +      suffix: String
 +    )(implicit fm: ClassTag[F])  {
 +    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
 +  }
 +
 +  /**
 +   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
 +   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
 +   */
 +  def saveAsNewAPIHadoopFiles(
 +      prefix: String,
 +      suffix: String,
 +      keyClass: Class[_],
 +      valueClass: Class[_],
 +      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
 +      conf: Configuration = new Configuration
 +    ) {
 +    val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
 +      val file = rddToFileName(prefix, suffix, time)
 +      rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
 +    }
-     self.foreach(saveFunc)
++    self.foreachRDD(saveFunc)
 +  }
 +
 +  private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
 +
 +  private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --cc streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a4d0f9f,a477d20..f7f3346
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@@ -187,7 -186,7 +187,7 @@@ class StreamingContextSuite extends Fun
      ssc = new StreamingContext(master, appName, batchDuration)
      val inputStream = addInputStream(ssc)
      inputStream.map(x => { throw new TestException("error in map task"); x})
--               .foreach(_.count)
++               .foreachRDD(_.count)
  
      val exception = intercept[Exception] {
        ssc.start()