You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hafiz Mujadid <ha...@gmail.com> on 2014/12/03 14:45:58 UTC

converting DStream[String] into RDD[String] in spark streaming

Hi everyOne!

I want to convert a  DStream[String] into an RDD[String]. I could not find
how to do this.  

var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder,
DefaultDecoder](ssc, consumerConfig, topicMap,
StorageLevel.MEMORY_ONLY).map(_._2)
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x =>
new String(x))

Now I want to convert this streams into a single RDD[String].


Any help please.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: converting DStream[String] into RDD[String] in spark streaming

Posted by Sean Owen <so...@cloudera.com>.
On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar <de...@db.com> wrote:
> 1) if there are no sliding window calls in this streaming context, will
> there just one file written per interval?

As many files as there are partitions will be written in each interval.

> 2) if there is a sliding window call in the same context, such as
>
>     val hashTags = stream.flatMap(json =>
> DataObjectFactory.createStatus(json).getText.split("
> ").filter(_.startsWith("#")))
>
>     val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
> Seconds(600))
>                      .map{case (topic, count) => (count, topic)}
>                      .transform(_.sortByKey(false))
>
> will the some files get written multiples time (as long as the interval is
> in the batch)

I don't think it's right to say files will be written many times, but
yes it is my understanding that data will be written many times since
a datum lies in many windows.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: converting DStream[String] into RDD[String] in spark streaming

Posted by "deenar.toraskar" <de...@db.com>.
Sean

Dstream.saveAsTextFiles internally calls foreachRDD and saveAsTextFile for
each interval

  def saveAsTextFiles(prefix: String, suffix: String = "") {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc)
  }

    val sparkConf = new SparkConf().setAppName("TwitterRawJSON")
    val ssc = new StreamingContext(sparkConf, Seconds(30))
    stream.saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON")

1) if there are no sliding window calls in this streaming context, will
there just one file written per interval?
2) if there is a sliding window call in the same context, such as

    val hashTags = stream.flatMap(json =>
DataObjectFactory.createStatus(json).getText.split("
").filter(_.startsWith("#")))
    
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(600))
                     .map{case (topic, count) => (count, topic)}
                     .transform(_.sortByKey(false))

will the some files get written multiples time (as long as the interval is
in the batch)

Deenar

>>DStream.foreachRDD gives you an RDD[String] for each interval of 
course. I don't think it makes sense to say a DStream can be converted 
into one RDD since it is a stream. The past elements are inherently 
not supposed to stick around for a long time, and future elements 
aren't known. You may consider saving each RDD[String] to HDFS, and 
then simply loading it from HDFS as an RDD[String]. 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p22175.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: converting DStream[String] into RDD[String] in spark streaming

Posted by Sean Owen <so...@cloudera.com>.
DStream.foreachRDD gives you an RDD[String] for each interval of
course. I don't think it makes sense to say a DStream can be converted
into one RDD since it is a stream. The past elements are inherently
not supposed to stick around for a long time, and future elements
aren't known. You may consider saving each RDD[String] to HDFS, and
then simply loading it from HDFS as an RDD[String].

On Wed, Dec 3, 2014 at 7:45 AM, Hafiz Mujadid <ha...@gmail.com> wrote:
> Hi everyOne!
>
> I want to convert a  DStream[String] into an RDD[String]. I could not find
> how to do this.
>
> var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder,
> DefaultDecoder](ssc, consumerConfig, topicMap,
> StorageLevel.MEMORY_ONLY).map(_._2)
>     val streams = data.window(Seconds(interval), Seconds(interval)).map(x =>
> new String(x))
>
> Now I want to convert this streams into a single RDD[String].
>
>
> Any help please.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: converting DStream[String] into RDD[String] in spark streaming

Posted by Hafiz Mujadid <ha...@gmail.com>.
Thanks Dear, It is good to save this data to HDFS and then load back into an
RDD :)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p20258.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org