You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "宿荣全 (JIRA)" <ji...@apache.org> on 2014/12/11 06:35:15 UTC
[jira] [Comment Edited] (SPARK-4817) [streaming]Print the specified
number of data and handle all of the elements in RDD
[ https://issues.apache.org/jira/browse/SPARK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14242177#comment-14242177 ]
宿荣全 edited comment on SPARK-4817 at 12/11/14 5:35 AM:
------------------------------------------------------
[~srowen]
Always call foreachRDD, and operate on all of the RDD, and then call take on the RDD to get a few elements to print.It can achieve the effect, but it is more complicated.
for example:
1.val dstream = stream.map->filter->..foreachRDD(rdd => {
val array = rdd.collect
var result = Array[(String,String)]()
result = if (array.size > 5) array.take(5) else array.take(array.size)
result foreach println
})
2.val dstream = stream.map->filter->foreachRDD(rdd => {
val rddarray = ssc.sparkContext.runJob(rdd, (iter: Iterator[(String, String)]) => iter.toArray)
val array = Array.concat(rddarray: _*)
var result = Array[(String,String)]()
result = if (array.size > 5) array.take(5) else array.take(array.size)
result foreach println
})
this two samples can achieve the effect. From the design perspective streaming direct manipulation of the RDD is not a good design.and I thank the method 'foreachRDD' is generally not used in coding.
Generally when streaming register action by through the following 6 methods.Those methods all called method 'foreachRDD'.
1.DStream.foreach
2.DStream.saveAsObjectFiles
3.DStream.saveAsTextFiles
4.PairDStreamFunctions.saveAsHadoopFiles
5.PairDStreamFunctions.saveAsNewAPIHadoopFiles
was (Author: surq):
Always call foreachRDD, and operate on all of the RDD, and then call take on the RDD to get a few elements to print.It can achieve the effect, but it is more complicated.
for example:
1.val dstream = stream.map->filter->..foreachRDD(rdd => {
val array = rdd.collect
var result = Array[(String,String)]()
result = if (array.size > 5) array.take(5) else array.take(array.size)
result foreach println
})
2.val dstream = stream.map->filter->foreachRDD(rdd => {
val rddarray = ssc.sparkContext.runJob(rdd, (iter: Iterator[(String, String)]) => iter.toArray)
val array = Array.concat(rddarray: _*)
var result = Array[(String,String)]()
result = if (array.size > 5) array.take(5) else array.take(array.size)
result foreach println
})
this two samples can achieve the effect. From the design perspective streaming direct manipulation of the RDD is not a good design.and I thank the method 'foreachRDD' is generally not used in coding.
Generally when streaming register action by through the following 6 methods.Those methods all called method 'foreachRDD'.
①.DStream.foreach
②.DStream.saveAsObjectFiles
③.DStream.saveAsTextFiles
④.PairDStreamFunctions.saveAsHadoopFiles
⑤.PairDStreamFunctions.saveAsNewAPIHadoopFiles
> [streaming]Print the specified number of data and handle all of the elements in RDD
> -----------------------------------------------------------------------------------
>
> Key: SPARK-4817
> URL: https://issues.apache.org/jira/browse/SPARK-4817
> Project: Spark
> Issue Type: New Feature
> Components: Streaming
> Reporter: 宿荣全
> Priority: Minor
>
> Dstream.print function:Print 10 elements and handle 11 elements.
> A new function based on Dstream.print function is presented:
> the new function:
> Print the specified number of data and handle all of the elements in RDD.
> there is a work scene:
> val dstream = stream.map->filter->mapPartitions->print
> the data after filter need update database in mapPartitions,but don't need print each data,only need to print the top 20 for view the data processing.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org