You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/03 00:09:54 UTC

spark git commit: [SPARK-3325][Streaming] Add a parameter to the method print in class DStream

Repository: spark
Updated Branches:
  refs/heads/master 012839807 -> bd88b7185


[SPARK-3325][Streaming] Add a parameter to the method print in class DStream

This PR is a fixed version of the original PR #3237 by watermen and scwf.
This adds the ability to specify how many elements to print in `DStream.print`.

Author: Yadong Qi <qi...@gmail.com>
Author: q00251598 <qi...@huawei.com>
Author: Tathagata Das <ta...@gmail.com>
Author: wangfei <wa...@huawei.com>

Closes #3865 from tdas/print-num and squashes the following commits:

cd34e9e [Tathagata Das] Fix bug
7c09f16 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into HEAD
bb35d1a [Yadong Qi] Update MimaExcludes.scala
f8098ca [Yadong Qi] Update MimaExcludes.scala
f6ac3cb [Yadong Qi] Update MimaExcludes.scala
e4ed897 [Yadong Qi] Update MimaExcludes.scala
3b9d5cf [wangfei] fix conflicts
ec8a3af [q00251598] move to  Spark 1.3
26a70c0 [q00251598] extend the Python DStream's print
b589a4b [q00251598] add another print function


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd88b718
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd88b718
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd88b718

Branch: refs/heads/master
Commit: bd88b7185358ae60efc83dc6cbb3fb1d2bff6074
Parents: 0128398
Author: Yadong Qi <qi...@gmail.com>
Authored: Fri Jan 2 15:09:41 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Jan 2 15:09:41 2015 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                            |  3 +++
 python/pyspark/streaming/dstream.py                   | 12 +++++++-----
 .../spark/streaming/api/java/JavaDStreamLike.scala    | 10 +++++++++-
 .../org/apache/spark/streaming/dstream/DStream.scala  | 14 +++++++++++---
 4 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd88b718/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c377e5c..31d4c31 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,6 +54,9 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.mllib.linalg.Matrices.rand")
           ) ++ Seq(
+            // SPARK-3325
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.api.java.JavaDStreamLike.print"),
             // SPARK-2757
             ProblemFilters.exclude[IncompatibleResultTypeProblem](
               "org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." +

http://git-wip-us.apache.org/repos/asf/spark/blob/bd88b718/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 0826ddc..2fe3939 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -157,18 +157,20 @@ class DStream(object):
         api = self._ssc._jvm.PythonDStream
         api.callForeachRDD(self._jdstream, jfunc)
 
-    def pprint(self):
+    def pprint(self, num=10):
         """
-        Print the first ten elements of each RDD generated in this DStream.
+        Print the first num elements of each RDD generated in this DStream.
+
+        @param num: the number of elements from the first will be printed.
         """
         def takeAndPrint(time, rdd):
-            taken = rdd.take(11)
+            taken = rdd.take(num + 1)
             print "-------------------------------------------"
             print "Time: %s" % time
             print "-------------------------------------------"
-            for record in taken[:10]:
+            for record in taken[:num]:
                 print record
-            if len(taken) > 10:
+            if len(taken) > num:
                 print "..."
             print
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd88b718/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 2a7004e..e0542ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -51,7 +51,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * operator, so this DStream will be registered as an output stream and there materialized.
    */
   def print(): Unit = {
-    dstream.print()
+    print(10)
+  }
+
+  /**
+   * Print the first num elements of each RDD generated in this DStream. This is an output
+   * operator, so this DStream will be registered as an output stream and there materialized.
+   */
+  def print(num: Int): Unit = {
+    dstream.print(num)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bd88b718/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 7f8651e..28fc00c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -605,13 +605,21 @@ abstract class DStream[T: ClassTag] (
    * operator, so this DStream will be registered as an output stream and there materialized.
    */
   def print() {
+    print(10)
+  }
+
+  /**
+   * Print the first num elements of each RDD generated in this DStream. This is an output
+   * operator, so this DStream will be registered as an output stream and there materialized.
+   */
+  def print(num: Int) {
     def foreachFunc = (rdd: RDD[T], time: Time) => {
-      val first11 = rdd.take(11)
+      val firstNum = rdd.take(num + 1)
       println ("-------------------------------------------")
       println ("Time: " + time)
       println ("-------------------------------------------")
-      first11.take(10).foreach(println)
-      if (first11.size > 10) println("...")
+      firstNum.take(num).foreach(println)
+      if (firstNum.size > num) println("...")
       println()
     }
     new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org