You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mike Hugo <mi...@piragua.com> on 2017/06/16 20:01:53 UTC

Spark SQL within a DStream map function

Hello,

I have a web application that publishes JSON messages on to a messaging
queue that contain metadata and a link to a CSV document on S3.  I'd like
to iterate over these JSON messages, and for each one pull the CSV document
into spark SQL to transform it (based on the metadata in the JSON message)
and output the results to a search index.  Each file on S3 has different
headers, potentially different delimiters, and differing numbers of rows.

Basically what I'm trying to do is something like this:

        JavaDStream<ParsedDocument> parsedMetadataAndRows =
queueStream.map(new Function<String, ParsedDocument>() {
            @Override
            ParsedDocument call(String metadata) throws Exception {
                Map gson = new Gson().fromJson(metadata, Map.class)

                // get metadata from gson
                String s3Url = gson.url
                String delimiter = gson.delimiter
                // etc...

                // read s3Url
                Dataset dataFrame = sqlContext.read()
                        .format("com.databricks.spark.csv")
                        .option("delimiter", delimiter)
                        .option("header", true)
                        .option("inferSchema", true)
                        .load(url)

                // process document,
                ParsedDocument docPlusRows = //...

                return docPlusRows
            })

            JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows,
"index/type" // ...


But it appears I cannot get access to the sqlContext when I run this on the
spark cluster because that code is executing in the executor not in the
driver.

Is there a way I can access or create a SqlContext to be able to pull the
file down from S3 in my map function?  Or do you have any recommendations
as to how I could set up a streaming job in a different way that would
allow me to accept metadata on the stream of records coming in and pull
each file down from s3 for processing?

Thanks in advance for your help!

Mike

Re: Spark SQL within a DStream map function

Posted by Burak Yavuz <br...@gmail.com>.
Do you really need to create a DStream from the original messaging queue?
Can't you just read them in a while loop or something on the driver?

On Fri, Jun 16, 2017 at 1:01 PM, Mike Hugo <mi...@piragua.com> wrote:

> Hello,
>
> I have a web application that publishes JSON messages on to a messaging
> queue that contain metadata and a link to a CSV document on S3.  I'd like
> to iterate over these JSON messages, and for each one pull the CSV document
> into spark SQL to transform it (based on the metadata in the JSON message)
> and output the results to a search index.  Each file on S3 has different
> headers, potentially different delimiters, and differing numbers of rows.
>
> Basically what I'm trying to do is something like this:
>
>         JavaDStream<ParsedDocument> parsedMetadataAndRows =
> queueStream.map(new Function<String, ParsedDocument>() {
>             @Override
>             ParsedDocument call(String metadata) throws Exception {
>                 Map gson = new Gson().fromJson(metadata, Map.class)
>
>                 // get metadata from gson
>                 String s3Url = gson.url
>                 String delimiter = gson.delimiter
>                 // etc...
>
>                 // read s3Url
>                 Dataset dataFrame = sqlContext.read()
>                         .format("com.databricks.spark.csv")
>                         .option("delimiter", delimiter)
>                         .option("header", true)
>                         .option("inferSchema", true)
>                         .load(url)
>
>                 // process document,
>                 ParsedDocument docPlusRows = //...
>
>                 return docPlusRows
>             })
>
>             JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows,
> "index/type" // ...
>
>
> But it appears I cannot get access to the sqlContext when I run this on
> the spark cluster because that code is executing in the executor not in the
> driver.
>
> Is there a way I can access or create a SqlContext to be able to pull the
> file down from S3 in my map function?  Or do you have any recommendations
> as to how I could set up a streaming job in a different way that would
> allow me to accept metadata on the stream of records coming in and pull
> each file down from s3 for processing?
>
> Thanks in advance for your help!
>
> Mike
>