You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by coolgar <ka...@gmail.com> on 2016/11/07 22:22:49 UTC

Using Apache Spark Streaming - how to handle changing data format within stream

I'm using apache spark streaming with the kafka direct consumer. The data
stream I'm receiving is log data that includes a header with each block of
messages. Each DStream can therefore have many blocks of messages, each with
it's own header. 

The header is used to know how to interpret the following fields in the
block of messages. My challenge is that I'm building up (K,V) pairs that are
processed by reduceByKey() and I use this header to know how to parse the
fields that follow the header into the (K,V) pairs.
 
So each message received by kakfa may appear as follows (# denotes the
header field, \n denotes new line):
#fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
field6 field7\data4 data5 data6 data7\n...

Is there a way, without collecting all data back to the driver, to "grab"
the header and use it to subsequently process the messages that follow the
header until a new #fields comes along, rinse, repeat? 





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Apache Spark Streaming - how to handle changing data format within stream

Posted by Cody Koeninger <co...@koeninger.org>.
I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
     if  // it's a header
        parser = someParserBasedOn(line)
    else
       items += parser.parse(line)
 }
 items.iterator
}

On Mon, Nov 7, 2016 at 4:22 PM, coolgar <ka...@gmail.com> wrote:
> I'm using apache spark streaming with the kafka direct consumer. The data
> stream I'm receiving is log data that includes a header with each block of
> messages. Each DStream can therefore have many blocks of messages, each with
> it's own header.
>
> The header is used to know how to interpret the following fields in the
> block of messages. My challenge is that I'm building up (K,V) pairs that are
> processed by reduceByKey() and I use this header to know how to parse the
> fields that follow the header into the (K,V) pairs.
>
> So each message received by kakfa may appear as follows (# denotes the
> header field, \n denotes new line):
> #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
> field6 field7\data4 data5 data6 data7\n...
>
> Is there a way, without collecting all data back to the driver, to "grab"
> the header and use it to subsequently process the messages that follow the
> header until a new #fields comes along, rinse, repeat?
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Apache Spark Streaming - how to handle changing data format within stream

Posted by coolgar <ka...@gmail.com>.
Solution provided by Cody K :

I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
     if  // it's a header
        parser = someParserBasedOn(line)
    else
       items += parser.parse(line)
 }
 items.iterator
}




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org