You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/24 02:03:59 UTC

git commit: [SPARK-1540] Add an optional Ordering parameter to PairRDDFunctions.

Repository: spark
Updated Branches:
  refs/heads/master 432201c7e -> 640f9a0ef


[SPARK-1540] Add an optional Ordering parameter to PairRDDFunctions.

In https://issues.apache.org/jira/browse/SPARK-1540 we'd like to look at Spark's API to see if we can take advantage of Comparable keys in more places, which will make external spilling more efficient. This PR is a first step towards that that shows how to pass an Ordering when available and still continue functioning otherwise. It does this using a new implicit parameter with a default value of null.

The API is currently only in Scala -- in Java we'd have to add new versions of mapToPair and such that take a Comparator, or a new method to add a "type hint" to an RDD. We can address those later though.

Unfortunately requiring all keys to be Comparable would not work without requiring RDDs in general to contain only Comparable types. The reason is that methods such as distinct() and intersection() do a shuffle, but should be usable on RDDs of any type. So ordering will have to remain an optimization for the types that can be ordered. I think this isn't a horrible outcome though because one of the nice things about Spark's API is that it works on objects of *any* type, without requiring you to specify a schema or implement Writable or stuff like that.

Author: Matei Zaharia <ma...@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Reynold Xin <rx...@apache.org>

Closes #487 from mateiz/ordered-keys and squashes the following commits:

bd565f6 [Matei Zaharia] Pass an Ordering to only one version of groupBy because the Scala language spec doesn't allow having an optional parameter on all of them (this was only compiling in Scala 2.10 due to a bug).
4629965 [Matei Zaharia] Add tests for other versions of groupBy
3beae85 [Matei Zaharia] Added a test for implicit orderings
80b7a3b [Matei Zaharia] Add an optional Ordering parameter to PairRDDFunctions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/640f9a0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/640f9a0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/640f9a0e

Branch: refs/heads/master
Commit: 640f9a0efefd42cff86aecd4878a3a57f5ae85fa
Parents: 432201c
Author: Matei Zaharia <ma...@databricks.com>
Authored: Wed Apr 23 17:03:54 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Apr 23 17:03:54 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 29 +++++-----
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 46 +++++++++-------
 .../spark/rdd/SequenceFileRDDFunctions.scala    |  4 +-
 .../apache/spark/ImplicitOrderingSuite.scala    | 57 ++++++++++++++++++++
 .../spark/streaming/StreamingContext.scala      |  5 +-
 .../spark/streaming/dstream/DStream.scala       | 10 ++--
 .../dstream/PairDStreamFunctions.scala          | 18 +++----
 8 files changed, 124 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dcb6b68..e9d2f57 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1281,8 +1281,10 @@ object SparkContext extends Logging {
 
   // TODO: Add AccumulatorParams for other types, e.g. lists and strings
 
-  implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
+  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
+      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
     new PairRDDFunctions(rdd)
+  }
 
   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index d250bef..d2b9ee4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -52,11 +52,12 @@ import org.apache.spark.util.SerializableHyperLogLog
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
  * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
  */
-class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
+class PairRDDFunctions[K, V](self: RDD[(K, V)])
+    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
   extends Logging
   with SparkHadoopMapReduceUtil
-  with Serializable {
-
+  with Serializable
+{
   /**
    * Generic function to combine the elements for each key using a custom set of aggregation
    * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
@@ -77,7 +78,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       mapSideCombine: Boolean = true,
       serializer: Serializer = null): RDD[(K, C)] = {
     require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
-    if (getKeyClass().isArray) {
+    if (keyClass.isArray) {
       if (mapSideCombine) {
         throw new SparkException("Cannot use map-side combining with array keys.")
       }
@@ -170,7 +171,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
 
-    if (getKeyClass().isArray) {
+    if (keyClass.isArray) {
       throw new SparkException("reduceByKeyLocally() does not support array keys")
     }
 
@@ -288,7 +289,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * Return a copy of the RDD partitioned using the specified partitioner.
    */
   def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
-    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner)
@@ -458,7 +459,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W]))]  = {
-    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
@@ -473,7 +474,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
       : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
-    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
@@ -573,7 +574,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * supporting the key and value types K and V in this RDD.
    */
   def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+    saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
   /**
@@ -584,7 +585,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   def saveAsHadoopFile[F <: OutputFormat[K, V]](
       path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
     val runtimeClass = fm.runtimeClass
-    saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec)
+    saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
   }
 
   /**
@@ -592,7 +593,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
    */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
-    saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+    saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
   /**
@@ -782,7 +783,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def values: RDD[V] = self.map(_._2)
 
-  private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
+  private[spark] def keyClass: Class[_] = kt.runtimeClass
+
+  private[spark] def valueClass: Class[_] = vt.runtimeClass
 
-  private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass
+  private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 596dcb8..6c897cc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -284,7 +284,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return a new RDD containing the distinct elements in this RDD.
    */
-  def distinct(numPartitions: Int): RDD[T] =
+  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
 
   /**
@@ -301,7 +301,7 @@ abstract class RDD[T: ClassTag](
    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
    * which can avoid performing a shuffle.
    */
-  def repartition(numPartitions: Int): RDD[T] = {
+  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {
     coalesce(numPartitions, shuffle = true)
   }
 
@@ -325,7 +325,8 @@ abstract class RDD[T: ClassTag](
    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
    * data distributed using a hash partitioner.
    */
-  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
+      : RDD[T] = {
     if (shuffle) {
       // include a shuffle step so that our upstream tasks are still distributed
       new CoalescedRDD(
@@ -424,10 +425,11 @@ abstract class RDD[T: ClassTag](
    *
    * Note that this method performs a shuffle internally.
    */
-  def intersection(other: RDD[T]): RDD[T] =
+  def intersection(other: RDD[T]): RDD[T] = {
     this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
         .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
         .keys
+  }
 
   /**
    * Return the intersection of this RDD and another one. The output will not contain any duplicate
@@ -437,10 +439,12 @@ abstract class RDD[T: ClassTag](
    *
    * @param partitioner Partitioner to use for the resulting RDD
    */
-  def intersection(other: RDD[T], partitioner: Partitioner): RDD[T] =
+  def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null)
+      : RDD[T] = {
     this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
         .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
         .keys
+  }
 
   /**
    * Return the intersection of this RDD and another one. The output will not contain any duplicate
@@ -450,10 +454,11 @@ abstract class RDD[T: ClassTag](
    *
    * @param numPartitions How many partitions to use in the resulting RDD
    */
-  def intersection(other: RDD[T], numPartitions: Int): RDD[T] =
+  def intersection(other: RDD[T], numPartitions: Int): RDD[T] = {
     this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new HashPartitioner(numPartitions))
         .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
         .keys
+  }
 
   /**
    * Return an RDD created by coalescing all elements within each partition into an array.
@@ -467,22 +472,25 @@ abstract class RDD[T: ClassTag](
   def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
 
   /**
-   * Return an RDD of grouped items.
+   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
+   * mapping to that key.
    */
-  def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
+  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
     groupBy[K](f, defaultPartitioner(this))
 
   /**
    * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
    * mapping to that key.
    */
-  def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
+  def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
     groupBy(f, new HashPartitioner(numPartitions))
 
   /**
-   * Return an RDD of grouped items.
+   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
+   * mapping to that key.
    */
-  def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
+  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
+      : RDD[(K, Iterable[T])] = {
     val cleanF = sc.clean(f)
     this.map(t => (cleanF(t), t)).groupByKey(p)
   }
@@ -739,7 +747,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
    */
-  def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
+  def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
     if (partitioner == Some(p)) {
       // Our partitioner knows how to handle T (which, since we have a partitioner, is
       // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
@@ -847,7 +855,7 @@ abstract class RDD[T: ClassTag](
    * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
    * combine step happens locally on the master, equivalent to running a single reduce task.
    */
-  def countByValue(): Map[T, Long] = {
+  def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
     if (elementClassTag.runtimeClass.isArray) {
       throw new SparkException("countByValue() does not support arrays")
     }
@@ -877,10 +885,10 @@ abstract class RDD[T: ClassTag](
    * Approximate version of countByValue().
    */
   @Experimental
-  def countByValueApprox(
-      timeout: Long,
-      confidence: Double = 0.95
-      ): PartialResult[Map[T, BoundedDouble]] = {
+  def countByValueApprox(timeout: Long, confidence: Double = 0.95)
+      (implicit ord: Ordering[T] = null)
+      : PartialResult[Map[T, BoundedDouble]] =
+  {
     if (elementClassTag.runtimeClass.isArray) {
       throw new SparkException("countByValueApprox() does not support arrays")
     }
@@ -1030,13 +1038,13 @@ abstract class RDD[T: ClassTag](
    * Returns the max of this RDD as defined by the implicit Ordering[T].
    * @return the maximum element of the RDD
    * */
-  def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)
+  def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max)
 
   /**
    * Returns the min of this RDD as defined by the implicit Ordering[T].
    * @return the minimum element of the RDD
    * */
-  def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)
+  def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
 
   /**
    * Save this RDD as a text file, using string representations of elements.

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 7df9a29..9a1efc8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -68,8 +68,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
 
     val keyClass = getWritableClass[K]
     val valueClass = getWritableClass[V]
-    val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
-    val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
+    val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
+    val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
 
     logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
       valueClass.getSimpleName + ")" )

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
new file mode 100644
index 0000000..4bd8891
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+
+class ImplicitOrderingSuite extends FunSuite with LocalSparkContext {
+  class NonOrderedClass {}
+
+  class ComparableClass extends Comparable[ComparableClass] {
+    override def compareTo(o: ComparableClass): Int = ???
+  }
+
+  class OrderedClass extends Ordered[OrderedClass] {
+    override def compare(o: OrderedClass): Int = ???
+  }
+
+  // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should.
+  test("basic inference of Orderings"){
+    sc = new SparkContext("local", "test")
+    val rdd = sc.parallelize(1 to 10)
+
+    // Infer orderings after basic maps to particular types
+    assert(rdd.map(x => (x, x)).keyOrdering.isDefined)
+    assert(rdd.map(x => (1, x)).keyOrdering.isDefined)
+    assert(rdd.map(x => (x.toString, x)).keyOrdering.isDefined)
+    assert(rdd.map(x => (null, x)).keyOrdering.isDefined)
+    assert(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty)
+    assert(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined)
+    assert(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined)
+
+    // Infer orderings for other RDD methods
+    assert(rdd.groupBy(x => x).keyOrdering.isDefined)
+    assert(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty)
+    assert(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined)
+    assert(rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined)
+    assert(rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined)
+    assert(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9ba6e02..1c89543 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -495,7 +495,10 @@ class StreamingContext private[streaming] (
 
 object StreamingContext extends Logging {
 
-  implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
+  private[streaming] val DEFAULT_CLEANER_TTL = 3600
+
+  implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
+      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
     new PairDStreamFunctions[K, V](stream)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7e5215..d393cc0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -488,7 +488,8 @@ abstract class DStream[T: ClassTag] (
    * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
    * `numPartitions` not specified).
    */
-  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
+      : DStream[(T, Long)] =
     this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
 
   /**
@@ -686,9 +687,10 @@ abstract class DStream[T: ClassTag] (
   def countByValueAndWindow(
       windowDuration: Duration,
       slideDuration: Duration,
-      numPartitions: Int = ssc.sc.defaultParallelism
-    ): DStream[(T, Long)] = {
-
+      numPartitions: Int = ssc.sc.defaultParallelism)
+      (implicit ord: Ordering[T] = null)
+      : DStream[(T, Long)] =
+  {
     this.map(x => (x, 1L)).reduceByKeyAndWindow(
       (x: Long, y: Long) => x + y,
       (x: Long, y: Long) => x - y,

http://git-wip-us.apache.org/repos/asf/spark/blob/640f9a0e/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 354bc13..826bf39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -37,13 +37,13 @@ import org.apache.spark.streaming.{Time, Duration}
  * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
  * these functions.
  */
-class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
-  extends Serializable {
-
+class PairDStreamFunctions[K, V](self: DStream[(K,V)])
+    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
+  extends Serializable
+{
   private[streaming] def ssc = self.ssc
 
-  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)
-  = {
+  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
     new HashPartitioner(numPartitions)
   }
 
@@ -576,7 +576,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
       prefix: String,
       suffix: String
     )(implicit fm: ClassTag[F]) {
-    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+    saveAsHadoopFiles(prefix, suffix, keyClass, valueClass,
       fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
@@ -607,7 +607,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
       prefix: String,
       suffix: String
     )(implicit fm: ClassTag[F])  {
-    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+    saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
       fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
@@ -630,7 +630,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
     self.foreachRDD(saveFunc)
   }
 
-  private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
+  private def keyClass: Class[_] = kt.runtimeClass
 
-  private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
+  private def valueClass: Class[_] = vt.runtimeClass
 }