You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "hsy541@gmail.com" <hs...@gmail.com> on 2014/07/23 01:03:49 UTC

How to do an interactive Spark SQL

Hi guys,

I'm able to run some Spark SQL example but the sql is static in the code. I
would like to know is there a way to read sql from somewhere else (shell
for example)

I could read sql statement from kafka/zookeeper, but I cannot share the sql
to all workers. broadcast seems not working for updating values.

Moreover if I use some non-serializable class(DataInputStream etc) to read
sql from other source, I always get "Task not serializable:
java.io.NotSerializableException"


Best,
Siyuan

Re: How to do an interactive Spark SQL

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Anyone has any idea on this?


On Tue, Jul 22, 2014 at 7:02 PM, hsy541@gmail.com <hs...@gmail.com> wrote:

> But how do they do the interactive sql in the demo?
> https://www.youtube.com/watch?v=dJQ5lV5Tldw
>
> And if it can work in the local mode. I think it should be able to work in
> cluster mode, correct?
>
>
> On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
>
>> Hi,
>>
>> as far as I know, after the Streaming Context has started, the processing
>> pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
>> statement is transformed into RDD operations when the Streaming Context
>> starts, I think there is no way to change the statement that is executed on
>> the current stream after the StreamingContext has started.
>>
>> Tobias
>>
>>
>> On Wed, Jul 23, 2014 at 9:55 AM, hsy541@gmail.com <hs...@gmail.com>
>> wrote:
>>
>>> For example, this is what I tested and work on local mode, what it does
>>> is it get data and sql query both from kafka and do sql on each RDD and
>>> output the result back to kafka again
>>> I defined a var called *sqlS. * In the streaming part as you can see I
>>> change the sql statement if it consumes a sql message from kafka then next
>>> time when you do *sql(sqlS) *it execute the updated sql query.
>>>
>>> But this code doesn't work in cluster because sqlS is not updated on all
>>> the workers from what I understand.
>>>
>>> So my question is how do I change the sqlS value at runtime and make all
>>> the workers pick the latest value.
>>>
>>>
>>>     *var sqlS = "select count(*) from records"*
>>>     val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
>>> args
>>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>     val sc = new SparkContext(sparkConf)
>>>     val ssc = new StreamingContext(sc, Seconds(2))
>>>     val sqlContext = new SQLContext(sc)
>>>
>>>     // Importing the SQL context gives access to all the SQL functions
>>> and implicit conversions.
>>>     import sqlContext._
>>>     import sqlContext.createSchemaRDD
>>>
>>>     //    val tt = Time(5000)
>>>     val topicpMap = collection.immutable.HashMap(topic ->
>>> numParts.toInt, sqltopic -> 2)
>>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS
>>> = t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>>>
>>>     val zkClient = new ZkClient(zkQuorum, 30000, 30000,
>>> ZKStringSerializer)
>>>
>>>     val brokerString =
>>> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>>>
>>>     KafkaSpark.props.put("metadata.broker.list", brokerString)
>>>     val config = new ProducerConfig(KafkaSpark.props)
>>>     val producer = new Producer[String, String](config)
>>>
>>>     val result = recordsStream.foreachRDD((recRDD) => {
>>>       val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>>>       schemaRDD.registerAsTable(tName)
>>>       val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) =>
>>> { s + r.mkString(",") + "\n" })
>>>       producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
>>> $sqlS \n $result"))
>>>     })
>>>     ssc.start()
>>>     ssc.awaitTermination()
>>>
>>>
>>> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zo...@gmail.com>
>>> wrote:
>>>
>>>> Can you paste a small code example to illustrate your questions?
>>>>
>>>> On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hs...@gmail.com>
>>>> wrote:
>>>> > Sorry, typo. What I mean is sharing. If the sql is changing at
>>>> runtime, how
>>>> > do I broadcast the sql to all workers that is doing sql analysis.
>>>> >
>>>> > Best,
>>>> > Siyuan
>>>> >
>>>> >
>>>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zo...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>>>> >> code? What do you mean by "cannot shar the sql to all workers"?
>>>> >>
>>>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com>
>>>> >> wrote:
>>>> >> > Hi guys,
>>>> >> >
>>>> >> > I'm able to run some Spark SQL example but the sql is static in the
>>>> >> > code. I
>>>> >> > would like to know is there a way to read sql from somewhere else
>>>> (shell
>>>> >> > for
>>>> >> > example)
>>>> >> >
>>>> >> > I could read sql statement from kafka/zookeeper, but I cannot
>>>> share the
>>>> >> > sql
>>>> >> > to all workers. broadcast seems not working for updating values.
>>>> >> >
>>>> >> > Moreover if I use some non-serializable class(DataInputStream etc)
>>>> to
>>>> >> > read
>>>> >> > sql from other source, I always get "Task not serializable:
>>>> >> > java.io.NotSerializableException"
>>>> >> >
>>>> >> >
>>>> >> > Best,
>>>> >> > Siyuan
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Re: How to do an interactive Spark SQL

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
But how do they do the interactive sql in the demo?
https://www.youtube.com/watch?v=dJQ5lV5Tldw

And if it can work in the local mode. I think it should be able to work in
cluster mode, correct?


On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> as far as I know, after the Streaming Context has started, the processing
> pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
> statement is transformed into RDD operations when the Streaming Context
> starts, I think there is no way to change the statement that is executed on
> the current stream after the StreamingContext has started.
>
> Tobias
>
>
> On Wed, Jul 23, 2014 at 9:55 AM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
>
>> For example, this is what I tested and work on local mode, what it does
>> is it get data and sql query both from kafka and do sql on each RDD and
>> output the result back to kafka again
>> I defined a var called *sqlS. * In the streaming part as you can see I
>> change the sql statement if it consumes a sql message from kafka then next
>> time when you do *sql(sqlS) *it execute the updated sql query.
>>
>> But this code doesn't work in cluster because sqlS is not updated on all
>> the workers from what I understand.
>>
>> So my question is how do I change the sqlS value at runtime and make all
>> the workers pick the latest value.
>>
>>
>>     *var sqlS = "select count(*) from records"*
>>     val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
>> args
>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>     val sc = new SparkContext(sparkConf)
>>     val ssc = new StreamingContext(sc, Seconds(2))
>>     val sqlContext = new SQLContext(sc)
>>
>>     // Importing the SQL context gives access to all the SQL functions
>> and implicit conversions.
>>     import sqlContext._
>>     import sqlContext.createSchemaRDD
>>
>>     //    val tt = Time(5000)
>>     val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
>> sqltopic -> 2)
>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
>> t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>>
>>     val zkClient = new ZkClient(zkQuorum, 30000, 30000,
>> ZKStringSerializer)
>>
>>     val brokerString =
>> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>>
>>     KafkaSpark.props.put("metadata.broker.list", brokerString)
>>     val config = new ProducerConfig(KafkaSpark.props)
>>     val producer = new Producer[String, String](config)
>>
>>     val result = recordsStream.foreachRDD((recRDD) => {
>>       val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>>       schemaRDD.registerAsTable(tName)
>>       val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => {
>> s + r.mkString(",") + "\n" })
>>       producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
>> $sqlS \n $result"))
>>     })
>>     ssc.start()
>>     ssc.awaitTermination()
>>
>>
>> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zo...@gmail.com>
>> wrote:
>>
>>> Can you paste a small code example to illustrate your questions?
>>>
>>> On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hs...@gmail.com>
>>> wrote:
>>> > Sorry, typo. What I mean is sharing. If the sql is changing at
>>> runtime, how
>>> > do I broadcast the sql to all workers that is doing sql analysis.
>>> >
>>> > Best,
>>> > Siyuan
>>> >
>>> >
>>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zo...@gmail.com>
>>> wrote:
>>> >>
>>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>>> >> code? What do you mean by "cannot shar the sql to all workers"?
>>> >>
>>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com>
>>> >> wrote:
>>> >> > Hi guys,
>>> >> >
>>> >> > I'm able to run some Spark SQL example but the sql is static in the
>>> >> > code. I
>>> >> > would like to know is there a way to read sql from somewhere else
>>> (shell
>>> >> > for
>>> >> > example)
>>> >> >
>>> >> > I could read sql statement from kafka/zookeeper, but I cannot share
>>> the
>>> >> > sql
>>> >> > to all workers. broadcast seems not working for updating values.
>>> >> >
>>> >> > Moreover if I use some non-serializable class(DataInputStream etc)
>>> to
>>> >> > read
>>> >> > sql from other source, I always get "Task not serializable:
>>> >> > java.io.NotSerializableException"
>>> >> >
>>> >> >
>>> >> > Best,
>>> >> > Siyuan
>>> >
>>> >
>>>
>>
>>
>

Re: How to do an interactive Spark SQL

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

as far as I know, after the Streaming Context has started, the processing
pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
statement is transformed into RDD operations when the Streaming Context
starts, I think there is no way to change the statement that is executed on
the current stream after the StreamingContext has started.

Tobias


On Wed, Jul 23, 2014 at 9:55 AM, hsy541@gmail.com <hs...@gmail.com> wrote:

> For example, this is what I tested and work on local mode, what it does is
> it get data and sql query both from kafka and do sql on each RDD and output
> the result back to kafka again
> I defined a var called *sqlS. * In the streaming part as you can see I
> change the sql statement if it consumes a sql message from kafka then next
> time when you do *sql(sqlS) *it execute the updated sql query.
>
> But this code doesn't work in cluster because sqlS is not updated on all
> the workers from what I understand.
>
> So my question is how do I change the sqlS value at runtime and make all
> the workers pick the latest value.
>
>
>     *var sqlS = "select count(*) from records"*
>     val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
> args
>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>     val sc = new SparkContext(sparkConf)
>     val ssc = new StreamingContext(sc, Seconds(2))
>     val sqlContext = new SQLContext(sc)
>
>     // Importing the SQL context gives access to all the SQL functions and
> implicit conversions.
>     import sqlContext._
>     import sqlContext.createSchemaRDD
>
>     //    val tt = Time(5000)
>     val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
> sqltopic -> 2)
>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
> t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>
>     val zkClient = new ZkClient(zkQuorum, 30000, 30000, ZKStringSerializer)
>
>     val brokerString =
> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>
>     KafkaSpark.props.put("metadata.broker.list", brokerString)
>     val config = new ProducerConfig(KafkaSpark.props)
>     val producer = new Producer[String, String](config)
>
>     val result = recordsStream.foreachRDD((recRDD) => {
>       val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>       schemaRDD.registerAsTable(tName)
>       val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => {
> s + r.mkString(",") + "\n" })
>       producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
> $sqlS \n $result"))
>     })
>     ssc.start()
>     ssc.awaitTermination()
>
>
> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zo...@gmail.com>
> wrote:
>
>> Can you paste a small code example to illustrate your questions?
>>
>> On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hs...@gmail.com>
>> wrote:
>> > Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
>> how
>> > do I broadcast the sql to all workers that is doing sql analysis.
>> >
>> > Best,
>> > Siyuan
>> >
>> >
>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zo...@gmail.com>
>> wrote:
>> >>
>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>> >> code? What do you mean by "cannot shar the sql to all workers"?
>> >>
>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com>
>> >> wrote:
>> >> > Hi guys,
>> >> >
>> >> > I'm able to run some Spark SQL example but the sql is static in the
>> >> > code. I
>> >> > would like to know is there a way to read sql from somewhere else
>> (shell
>> >> > for
>> >> > example)
>> >> >
>> >> > I could read sql statement from kafka/zookeeper, but I cannot share
>> the
>> >> > sql
>> >> > to all workers. broadcast seems not working for updating values.
>> >> >
>> >> > Moreover if I use some non-serializable class(DataInputStream etc) to
>> >> > read
>> >> > sql from other source, I always get "Task not serializable:
>> >> > java.io.NotSerializableException"
>> >> >
>> >> >
>> >> > Best,
>> >> > Siyuan
>> >
>> >
>>
>
>

Re: How to do an interactive Spark SQL

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
For example, this is what I tested and work on local mode, what it does is
it get data and sql query both from kafka and do sql on each RDD and output
the result back to kafka again
I defined a var called *sqlS. * In the streaming part as you can see I
change the sql statement if it consumes a sql message from kafka then next
time when you do *sql(sqlS) *it execute the updated sql query.

But this code doesn't work in cluster because sqlS is not updated on all
the workers from what I understand.

So my question is how do I change the sqlS value at runtime and make all
the workers pick the latest value.


    *var sqlS = "select count(*) from records"*
    val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
args
    val sparkConf = new SparkConf().setAppName("KafkaSpark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val sqlContext = new SQLContext(sc)

    // Importing the SQL context gives access to all the SQL functions and
implicit conversions.
    import sqlContext._
    import sqlContext.createSchemaRDD

    //    val tt = Time(5000)
    val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
sqltopic -> 2)
    val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
t._2;* false } else true }).map(t => getRecord(t._2.split("#")))

    val zkClient = new ZkClient(zkQuorum, 30000, 30000, ZKStringSerializer)

    val brokerString =
ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")

    KafkaSpark.props.put("metadata.broker.list", brokerString)
    val config = new ProducerConfig(KafkaSpark.props)
    val producer = new Producer[String, String](config)

    val result = recordsStream.foreachRDD((recRDD) => {
      val schemaRDD = sqlContext.createSchemaRDD(recRDD)
      schemaRDD.registerAsTable(tName)
      val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => { s
+ r.mkString(",") + "\n" })
      producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
$sqlS \n $result"))
    })
    ssc.start()
    ssc.awaitTermination()


On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zo...@gmail.com> wrote:

> Can you paste a small code example to illustrate your questions?
>
> On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
> > Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
> how
> > do I broadcast the sql to all workers that is doing sql analysis.
> >
> > Best,
> > Siyuan
> >
> >
> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zo...@gmail.com>
> wrote:
> >>
> >> Do you mean that the texts of the SQL queries being hardcoded in the
> >> code? What do you mean by "cannot shar the sql to all workers"?
> >>
> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com>
> >> wrote:
> >> > Hi guys,
> >> >
> >> > I'm able to run some Spark SQL example but the sql is static in the
> >> > code. I
> >> > would like to know is there a way to read sql from somewhere else
> (shell
> >> > for
> >> > example)
> >> >
> >> > I could read sql statement from kafka/zookeeper, but I cannot share
> the
> >> > sql
> >> > to all workers. broadcast seems not working for updating values.
> >> >
> >> > Moreover if I use some non-serializable class(DataInputStream etc) to
> >> > read
> >> > sql from other source, I always get "Task not serializable:
> >> > java.io.NotSerializableException"
> >> >
> >> >
> >> > Best,
> >> > Siyuan
> >
> >
>

Re: How to do an interactive Spark SQL

Posted by Zongheng Yang <zo...@gmail.com>.
Can you paste a small code example to illustrate your questions?

On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hs...@gmail.com> wrote:
> Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how
> do I broadcast the sql to all workers that is doing sql analysis.
>
> Best,
> Siyuan
>
>
> On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zo...@gmail.com> wrote:
>>
>> Do you mean that the texts of the SQL queries being hardcoded in the
>> code? What do you mean by "cannot shar the sql to all workers"?
>>
>> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com>
>> wrote:
>> > Hi guys,
>> >
>> > I'm able to run some Spark SQL example but the sql is static in the
>> > code. I
>> > would like to know is there a way to read sql from somewhere else (shell
>> > for
>> > example)
>> >
>> > I could read sql statement from kafka/zookeeper, but I cannot share the
>> > sql
>> > to all workers. broadcast seems not working for updating values.
>> >
>> > Moreover if I use some non-serializable class(DataInputStream etc) to
>> > read
>> > sql from other source, I always get "Task not serializable:
>> > java.io.NotSerializableException"
>> >
>> >
>> > Best,
>> > Siyuan
>
>

Re: How to do an interactive Spark SQL

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how
do I broadcast the sql to all workers that is doing sql analysis.

Best,
Siyuan


On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zo...@gmail.com> wrote:

> Do you mean that the texts of the SQL queries being hardcoded in the
> code? What do you mean by "cannot shar the sql to all workers"?
>
> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
> > Hi guys,
> >
> > I'm able to run some Spark SQL example but the sql is static in the
> code. I
> > would like to know is there a way to read sql from somewhere else (shell
> for
> > example)
> >
> > I could read sql statement from kafka/zookeeper, but I cannot share the
> sql
> > to all workers. broadcast seems not working for updating values.
> >
> > Moreover if I use some non-serializable class(DataInputStream etc) to
> read
> > sql from other source, I always get "Task not serializable:
> > java.io.NotSerializableException"
> >
> >
> > Best,
> > Siyuan
>

Re: How to do an interactive Spark SQL

Posted by Zongheng Yang <zo...@gmail.com>.
Do you mean that the texts of the SQL queries being hardcoded in the
code? What do you mean by "cannot shar the sql to all workers"?

On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hs...@gmail.com> wrote:
> Hi guys,
>
> I'm able to run some Spark SQL example but the sql is static in the code. I
> would like to know is there a way to read sql from somewhere else (shell for
> example)
>
> I could read sql statement from kafka/zookeeper, but I cannot share the sql
> to all workers. broadcast seems not working for updating values.
>
> Moreover if I use some non-serializable class(DataInputStream etc) to read
> sql from other source, I always get "Task not serializable:
> java.io.NotSerializableException"
>
>
> Best,
> Siyuan