You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 21:18:24 UTC
[7/9] git commit: Merge remote-tracking branch 'apache/master' into
dstream-move
Merge remote-tracking branch 'apache/master' into dstream-move
Conflicts:
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/777c181d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/777c181d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/777c181d
Branch: refs/heads/master
Commit: 777c181d2f583570956724f9cbe20eb1dc7048f1
Parents: 034f89a 405bfe8
Author: Tathagata Das <ta...@gmail.com>
Authored: Sun Jan 12 21:59:51 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Sun Jan 12 21:59:51 2014 -0800
----------------------------------------------------------------------
docs/streaming-programming-guide.md | 4 +--
.../streaming/examples/RawNetworkGrep.scala | 2 +-
.../examples/RecoverableNetworkWordCount.scala | 2 +-
.../streaming/examples/TwitterAlgebirdCMS.scala | 4 +--
.../streaming/examples/TwitterAlgebirdHLL.scala | 4 +--
.../streaming/examples/TwitterPopularTags.scala | 4 +--
.../examples/clickstream/PageViewStream.scala | 2 +-
.../streaming/api/java/JavaDStreamLike.scala | 26 ++++++++++++++++--
.../spark/streaming/dstream/DStream.scala | 29 +++++++++++++++-----
.../dstream/PairDStreamFunctions.scala | 4 +--
.../spark/streaming/BasicOperationsSuite.scala | 2 +-
.../spark/streaming/StreamingContextSuite.scala | 2 +-
12 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/docs/streaming-programming-guide.md
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --cc examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9,d51e6e9..8c5d0bd
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@@ -82,7 -82,7 +82,7 @@@ object RecoverableNetworkWordCount
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-- wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
++ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
println("Appending to " + outputFile.getAbsolutePath)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 8014db6,0000000..a7c4cca
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@@ -1,742 -1,0 +1,757 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
++
++import scala.deprecated
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
+
- import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
++import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.Duration
+
-
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
+
+abstract class DStream[T: ClassTag] (
+ @transient private[streaming] var ssc: StreamingContext
+ ) extends Serializable with Logging {
+
+ // =======================================================================
+ // Methods that should be implemented by subclasses of DStream
+ // =======================================================================
+
+ /** Time interval after which the DStream generates a RDD */
+ def slideDuration: Duration
+
+ /** List of parent DStreams on which this DStream depends on */
+ def dependencies: List[DStream[_]]
+
+ /** Method that generates a RDD for the given time */
+ def compute (validTime: Time): Option[RDD[T]]
+
+ // =======================================================================
+ // Methods and fields available on all DStreams
+ // =======================================================================
+
+ // RDDs generated, marked as private[streaming] so that testsuites can access it
+ @transient
+ private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
+
+ // Time zero for the DStream
+ private[streaming] var zeroTime: Time = null
+
+ // Duration for which the DStream will remember each RDD created
+ private[streaming] var rememberDuration: Duration = null
+
+ // Storage level of the RDDs in the stream
+ private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
+
+ // Checkpoint details
+ private[streaming] val mustCheckpoint = false
+ private[streaming] var checkpointDuration: Duration = null
+ private[streaming] val checkpointData = new DStreamCheckpointData(this)
+
+ // Reference to whole DStream graph
+ private[streaming] var graph: DStreamGraph = null
+
+ private[streaming] def isInitialized = (zeroTime != null)
+
+ // Duration for which the DStream requires its parent DStream to remember each RDD created
+ private[streaming] def parentRememberDuration = rememberDuration
+
+ /** Return the StreamingContext associated with this DStream */
+ def context = ssc
+
+ /** Persist the RDDs of this DStream with the given storage level */
+ def persist(level: StorageLevel): DStream[T] = {
+ if (this.isInitialized) {
+ throw new UnsupportedOperationException(
+ "Cannot change storage level of an DStream after streaming context has started")
+ }
+ this.storageLevel = level
+ this
+ }
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def cache(): DStream[T] = persist()
+
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
+ def checkpoint(interval: Duration): DStream[T] = {
+ if (isInitialized) {
+ throw new UnsupportedOperationException(
+ "Cannot change checkpoint interval of an DStream after streaming context has started")
+ }
+ persist()
+ checkpointDuration = interval
+ this
+ }
+
+ /**
+ * Initialize the DStream by setting the "zero" time, based on which
+ * the validity of future times is calculated. This method also recursively initializes
+ * its parent DStreams.
+ */
+ private[streaming] def initialize(time: Time) {
+ if (zeroTime != null && zeroTime != time) {
+ throw new Exception("ZeroTime is already initialized to " + zeroTime
+ + ", cannot initialize it again to " + time)
+ }
+ zeroTime = time
+
+ // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
+ if (mustCheckpoint && checkpointDuration == null) {
+ checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
+ logInfo("Checkpoint interval automatically set to " + checkpointDuration)
+ }
+
+ // Set the minimum value of the rememberDuration if not already set
+ var minRememberDuration = slideDuration
+ if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
+ minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten
+ }
+ if (rememberDuration == null || rememberDuration < minRememberDuration) {
+ rememberDuration = minRememberDuration
+ }
+
+ // Initialize the dependencies
+ dependencies.foreach(_.initialize(zeroTime))
+ }
+
+ private[streaming] def validate() {
+ assert(rememberDuration != null, "Remember duration is set to null")
+
+ assert(
+ !mustCheckpoint || checkpointDuration != null,
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
+ " Please use DStream.checkpoint() to set the interval."
+ )
+
+ assert(
+ checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
+ "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
+ " or SparkContext.checkpoint() to set the checkpoint directory."
+ )
+
+ assert(
+ checkpointDuration == null || checkpointDuration >= slideDuration,
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
+ checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
+ "Please set it to at least " + slideDuration + "."
+ )
+
+ assert(
+ checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
+ "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
+ checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
+ "Please set it to a multiple " + slideDuration + "."
+ )
+
+ assert(
+ checkpointDuration == null || storageLevel != StorageLevel.NONE,
+ "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
+ "level has not been set to enable persisting. Please use DStream.persist() to set the " +
+ "storage level to use memory for better checkpointing performance."
+ )
+
+ assert(
+ checkpointDuration == null || rememberDuration > checkpointDuration,
+ "The remember duration for " + this.getClass.getSimpleName + " has been set to " +
+ rememberDuration + " which is not more than the checkpoint interval (" +
+ checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
+ )
+
+ val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
+ logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
+ assert(
+ metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
+ "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+ "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+ "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
+ "set the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
+ )
+
+ dependencies.foreach(_.validate())
+
+ logInfo("Slide time = " + slideDuration)
+ logInfo("Storage level = " + storageLevel)
+ logInfo("Checkpoint interval = " + checkpointDuration)
+ logInfo("Remember duration = " + rememberDuration)
+ logInfo("Initialized and validated " + this)
+ }
+
+ private[streaming] def setContext(s: StreamingContext) {
+ if (ssc != null && ssc != s) {
+ throw new Exception("Context is already set in " + this + ", cannot set it again")
+ }
+ ssc = s
+ logInfo("Set context for " + this)
+ dependencies.foreach(_.setContext(ssc))
+ }
+
+ private[streaming] def setGraph(g: DStreamGraph) {
+ if (graph != null && graph != g) {
+ throw new Exception("Graph is already set in " + this + ", cannot set it again")
+ }
+ graph = g
+ dependencies.foreach(_.setGraph(graph))
+ }
+
+ private[streaming] def remember(duration: Duration) {
+ if (duration != null && duration > rememberDuration) {
+ rememberDuration = duration
+ logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
+ }
+ dependencies.foreach(_.remember(parentRememberDuration))
+ }
+
+ /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
+ private[streaming] def isTimeValid(time: Time): Boolean = {
+ if (!isInitialized) {
+ throw new Exception (this + " has not been initialized")
+ } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
+ false
+ } else {
+ logDebug("Time " + time + " is valid")
+ true
+ }
+ }
+
+ /**
+ * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
+ * method that should not be called directly.
+ */
+ private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
+ // If this DStream was not initialized (i.e., zeroTime not set), then do it
+ // If RDD was already generated, then retrieve it from HashMap
+ generatedRDDs.get(time) match {
+
+ // If an RDD was already generated and is being reused, then
+ // probably all RDDs in this DStream will be reused and hence should be cached
+ case Some(oldRDD) => Some(oldRDD)
+
+ // if RDD was not generated, and if the time is valid
+ // (based on sliding time of this DStream), then generate the RDD
+ case None => {
+ if (isTimeValid(time)) {
+ compute(time) match {
+ case Some(newRDD) =>
+ if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
+ }
+ if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
+ newRDD.checkpoint()
+ logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
+ }
+ generatedRDDs.put(time, newRDD)
+ Some(newRDD)
+ case None =>
+ None
+ }
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate a SparkStreaming job for the given time. This is an internal method that
+ * should not be called directly. This default implementation creates a job
+ * that materializes the corresponding RDD. Subclasses of DStream may override this
+ * to generate their own jobs.
+ */
+ private[streaming] def generateJob(time: Time): Option[Job] = {
+ getOrCompute(time) match {
+ case Some(rdd) => {
+ val jobFunc = () => {
+ val emptyFunc = { (iterator: Iterator[T]) => {} }
+ context.sparkContext.runJob(rdd, emptyFunc)
+ }
+ Some(new Job(time, jobFunc))
+ }
+ case None => None
+ }
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
+ */
+ private[streaming] def clearMetadata(time: Time) {
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearMetadata(time))
+ }
+
+ /* Adds metadata to the Stream while it is running.
+ * This method should be overwritten by sublcasses of InputDStream.
+ */
+ private[streaming] def addMetadata(metadata: Any) {
+ if (metadata != null) {
+ logInfo("Dropping Metadata: " + metadata.toString)
+ }
+ }
+
+ /**
+ * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
+ * this stream. This is an internal method that should not be called directly. This is
+ * a default implementation that saves only the file names of the checkpointed RDDs to
+ * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
+ * this method to save custom checkpoint data.
+ */
+ private[streaming] def updateCheckpointData(currentTime: Time) {
+ logDebug("Updating checkpoint data for time " + currentTime)
+ checkpointData.update(currentTime)
+ dependencies.foreach(_.updateCheckpointData(currentTime))
+ logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
+ }
+
+ private[streaming] def clearCheckpointData(time: Time) {
+ logDebug("Clearing checkpoint data")
+ checkpointData.cleanup(time)
+ dependencies.foreach(_.clearCheckpointData(time))
+ logDebug("Cleared checkpoint data")
+ }
+
+ /**
+ * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
+ * that should not be called directly. This is a default implementation that recreates RDDs
+ * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
+ * override the updateCheckpointData() method would also need to override this method.
+ */
+ private[streaming] def restoreCheckpointData() {
+ // Create RDDs from the checkpoint data
+ logInfo("Restoring checkpoint data")
+ checkpointData.restore()
+ dependencies.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
+ }
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ logDebug(this.getClass().getSimpleName + ".writeObject used")
+ if (graph != null) {
+ graph.synchronized {
+ if (graph.checkpointInProgress) {
+ oos.defaultWriteObject()
+ } else {
+ val msg = "Object of " + this.getClass.getName + " is being serialized " +
+ " possibly as a part of closure of an RDD operation. This is because " +
+ " the DStream object is being referred to from within the closure. " +
+ " Please rewrite the RDD operation inside this DStream to avoid this. " +
+ " This has been enforced to avoid bloating of Spark tasks " +
+ " with unnecessary objects."
+ throw new java.io.NotSerializableException(msg)
+ }
+ }
+ } else {
+ throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.")
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
+ ois.defaultReadObject()
+ generatedRDDs = new HashMap[Time, RDD[T]] ()
+ }
+
+ // =======================================================================
+ // DStream operations
+ // =======================================================================
+
+ /** Return a new DStream by applying a function to all elements of this DStream. */
+ def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
+ new MappedDStream(this, context.sparkContext.clean(mapFunc))
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
+ new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
+ }
+
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
+ def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
+ def glom(): DStream[Array[T]] = new GlommedDStream(this)
+
+
+ /**
+ * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
+ * returned DStream has exactly numPartitions partitions.
+ */
+ def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U: ClassTag](
+ mapPartFunc: Iterator[T] => Iterator[U],
+ preservePartitioning: Boolean = false
+ ): DStream[U] = {
+ new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(reduceFunc: (T, T) => T): DStream[T] =
+ this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
+ def count(): DStream[Long] = {
+ this.map(_ => (null, 1L))
+ .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
+ .reduceByKey(_ + _)
+ .map(_._2)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the counts of each distinct value in
+ * each RDD of this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ */
+ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+ this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
- def foreach(foreachFunc: RDD[T] => Unit) {
- this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
++ @deprecated("use foreachRDD", "0.9.0")
++ def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
++
++ /**
++ * Apply a function to each RDD in this DStream. This is an output operator, so
++ * 'this' DStream will be registered as an output stream and therefore materialized.
++ */
++ @deprecated("use foreachRDD", "0.9.0")
++ def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
++
++ /**
++ * Apply a function to each RDD in this DStream. This is an output operator, so
++ * 'this' DStream will be registered as an output stream and therefore materialized.
++ */
++ def foreachRDD(foreachFunc: RDD[T] => Unit) {
++ this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
- def foreach(foreachFunc: (RDD[T], Time) => Unit) {
++ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
+ ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream.
+ */
+ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream.
+ */
+ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ val cleanedF = context.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 1)
+ cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
+ }
+ new TransformedDStream[U](Seq(this), realTransformFunc)
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U: ClassTag, V: ClassTag](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U: ClassTag, V: ClassTag](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 2)
+ val rdd1 = rdds(0).asInstanceOf[RDD[T]]
+ val rdd2 = rdds(1).asInstanceOf[RDD[U]]
+ cleanedF(rdd1, rdd2, time)
+ }
+ new TransformedDStream[V](Seq(this, other), realTransformFunc)
+ }
+
+ /**
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print() {
+ def foreachFunc = (rdd: RDD[T], time: Time) => {
+ val first11 = rdd.take(11)
+ println ("-------------------------------------------")
+ println ("Time: " + time)
+ println ("-------------------------------------------")
+ first11.take(10).foreach(println)
+ if (first11.size > 10) println("...")
+ println()
+ }
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
+ ssc.registerOutputStream(newStream)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream. The new DStream generates RDDs with
+ * the same interval as this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+ */
+ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
+
+ /**
+ * Return a new DStream in which each RDD contains all the elements in seen in a
+ * sliding window of time over this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ new WindowedDStream(this, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream. However, the reduction is done incrementally
+ * using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ invReduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
+ this.map(x => (1, x))
+ .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
+ .map(_._2)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+ this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD contains the count of distinct elements in
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
+ * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
+ * `numPartitions` not specified).
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def countByValueAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int = ssc.sc.defaultParallelism
+ ): DStream[(T, Long)] = {
+
+ this.map(x => (x, 1L)).reduceByKeyAndWindow(
+ (x: Long, y: Long) => x + y,
+ (x: Long, y: Long) => x - y,
+ windowDuration,
+ slideDuration,
+ numPartitions,
+ (x: (T, Long)) => x._2 != 0L
+ )
+ }
+
+ /**
+ * Return a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same slideDuration as this DStream.
+ */
+ def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
+
+ /**
+ * Return all the RDDs defined by the Interval object (both end times included)
+ */
+ def slice(interval: Interval): Seq[RDD[T]] = {
+ slice(interval.beginTime, interval.endTime)
+ }
+
+ /**
+ * Return all the RDDs between 'fromTime' to 'toTime' (both included)
+ */
+ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+ if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ val alignedToTime = toTime.floor(slideDuration)
+ val alignedFromTime = fromTime.floor(slideDuration)
+
+ logInfo("Slicing from " + fromTime + " to " + toTime +
+ " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ if (time >= zeroTime) getOrCompute(time) else None
+ })
+ }
+
+ /**
+ * Save each RDD in this DStream as a Sequence file of serialized objects.
+ * The file name at each batch interval is generated based on `prefix` and
+ * `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsObjectFiles(prefix: String, suffix: String = "") {
+ val saveFunc = (rdd: RDD[T], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsObjectFile(file)
+ }
- this.foreach(saveFunc)
++ this.foreachRDD(saveFunc)
+ }
+
+ /**
+ * Save each RDD in this DStream as at text file, using string representation
+ * of elements. The file name at each batch interval is generated based on
+ * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsTextFiles(prefix: String, suffix: String = "") {
+ val saveFunc = (rdd: RDD[T], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsTextFile(file)
+ }
- this.foreach(saveFunc)
++ this.foreachRDD(saveFunc)
+ }
+
+ def register() {
+ ssc.registerOutputStream(this)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f71dd17,0000000..6b3e483
mode 100644,000000..100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@@ -1,622 -1,0 +1,622 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream._
+
+import org.apache.spark.{Partitioner, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
+import org.apache.spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{ClassTag, classTag}
+
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.{Time, Duration}
+
+class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
+extends Serializable {
+
+ private[streaming] def ssc = self.ssc
+
+ private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+ new HashPartitioner(numPartitions)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ */
+ def groupByKey(): DStream[(K, Seq[V])] = {
+ groupByKey(defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ */
+ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
+ groupByKey(defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
+ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
+ val createCombiner = (v: V) => ArrayBuffer[V](v)
+ val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
+ val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
+ combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
+ */
+ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
+ reduceByKey(reduceFunc, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
+ */
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
+ reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
+ */
+ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
+ }
+
+ /**
+ * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in
+ * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ */
+ def combineByKey[C: ClassTag](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): DStream[(K, C)] = {
+ new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
+ * then Spark's default number of partitions will be used
+ */
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): DStream[(K, Seq[V])] = {
+ val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
+ val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+ val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
+ self.groupByKey(partitioner)
+ .window(windowDuration, slideDuration)
+ .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions number of partitions of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): DStream[(K, V)] = {
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner partitioner for controlling the partitioning of each RDD
+ * in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): DStream[(K, V)] = {
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ self.reduceByKey(cleanedReduceFunc, partitioner)
+ .window(windowDuration, slideDuration)
+ .reduceByKey(cleanedReduceFunc, partitioner)
+ }
+
+ /**
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ *
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ *
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration = self.slideDuration,
+ numPartitions: Int = ssc.sc.defaultParallelism,
+ filterFunc: ((K, V)) => Boolean = null
+ ): DStream[(K, V)] = {
+
+ reduceByKeyAndWindow(
+ reduceFunc, invReduceFunc, windowDuration,
+ slideDuration, defaultPartitioner(numPartitions), filterFunc
+ )
+ }
+
+ /**
+ * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
+ * The reduced value of over a new window is calculated using the old window's reduced value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param filterFunc Optional function to filter expired key-value pairs;
+ * only pairs that satisfy the function are retained
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner,
+ filterFunc: ((K, V)) => Boolean
+ ): DStream[(K, V)] = {
+
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
+ val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
+ new ReducedWindowedDStream[K, V](
+ self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
+ windowDuration, slideDuration, partitioner
+ )
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassTag](
+ updateFunc: (Seq[V], Option[S]) => Option[S]
+ ): DStream[(K, S)] = {
+ updateStateByKey(updateFunc, defaultPartitioner())
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassTag](
+ updateFunc: (Seq[V], Option[S]) => Option[S],
+ numPartitions: Int
+ ): DStream[(K, S)] = {
+ updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassTag](
+ updateFunc: (Seq[V], Option[S]) => Option[S],
+ partitioner: Partitioner
+ ): DStream[(K, S)] = {
+ val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
+ iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
+ }
+ updateStateByKey(newUpdateFunc, partitioner, true)
+ }
+
+ /**
+ * Return a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated. Note, that
+ * this function may generate a different a tuple with a different key
+ * than the input key. It is up to the developer to decide whether to
+ * remember the partitioner despite the key being changed.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassTag](
+ updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
+ partitioner: Partitioner,
+ rememberPartitioner: Boolean
+ ): DStream[(K, S)] = {
+ new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
+ }
+
+ /**
+ * Return a new DStream by applying a map function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
+ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
+ new MapValuedDStream[K, V, U](self, mapValuesFunc)
+ }
+
+ /**
+ * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
+ def flatMapValues[U: ClassTag](
+ flatMapValuesFunc: V => TraversableOnce[U]
+ ): DStream[(K, U)] = {
+ new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number
+ * of partitions.
+ */
+ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
+ */
+ def cogroup[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Seq[V], Seq[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ */
+ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ join[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
+ def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ join[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
+ def join[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, W))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def leftOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def leftOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, Option[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
+ def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
+ def rightOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
+ def rightOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Option[V], W))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+ * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
+ def saveAsHadoopFiles[F <: OutputFormat[K, V]](
+ prefix: String,
+ suffix: String
+ )(implicit fm: ClassTag[F]) {
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
+ * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf = new JobConf
+ ) {
+ val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ }
- self.foreach(saveFunc)
++ self.foreachRDD(saveFunc)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
+ prefix: String,
+ suffix: String
+ )(implicit fm: ClassTag[F]) {
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration
+ ) {
+ val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ }
- self.foreach(saveFunc)
++ self.foreachRDD(saveFunc)
+ }
+
+ private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
+
+ private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/777c181d/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --cc streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a4d0f9f,a477d20..f7f3346
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@@ -187,7 -186,7 +187,7 @@@ class StreamingContextSuite extends Fun
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x})
-- .foreach(_.count)
++ .foreachRDD(_.count)
val exception = intercept[Exception] {
ssc.start()