You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "hsy541@gmail.com" <hs...@gmail.com> on 2014/07/09 21:21:22 UTC

Some question about SQL and streaming

Hi guys,

I'm a new user to spark. I would like to know is there an example of how to
user spark SQL and spark streaming together? My use case is I want to do
some SQL on the input stream from kafka.
Thanks!

Best,
Siyuan

Re: Some question about SQL and streaming

Posted by Tathagata Das <ta...@gmail.com>.
Yes, even though we dont have immediate plans, I definitely would like to
see it happen some time in not-so-distant future.

TD


On Thu, Jul 10, 2014 at 7:55 PM, Shao, Saisai <sa...@intel.com> wrote:

>  No specific plans to do so, since there has some functional loss like
> time based windowing function which is important for streaming sql. Also
> keep compatible with fast growing SparkSQL is quite hard. So no clear plans
> to submit to upstream.
>
>
>
> -Jerry
>
>
>
> *From:* Tobias Pfeiffer [mailto:tgp@preferred.jp]
> *Sent:* Friday, July 11, 2014 10:47 AM
>
> *To:* user@spark.apache.org
> *Subject:* Re: Some question about SQL and streaming
>
>
>
> Hi,
>
>
>
> On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai <sa...@intel.com>
> wrote:
>
> Actually we have a POC project which shows the power of combining Spark
> Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and
> get SchemaDStream. You can take a look at it:
> https://github.com/thunderain-project/StreamSQL
>
>
>
> Wow, that looks great! Any plans to get this code (or functionality)
> merged into Spark?
>
>
>
> Tobias
>
>
>

RE: Some question about SQL and streaming

Posted by "Shao, Saisai" <sa...@intel.com>.
No specific plans to do so, since there has some functional loss like time based windowing function which is important for streaming sql. Also keep compatible with fast growing SparkSQL is quite hard. So no clear plans to submit to upstream.

-Jerry

From: Tobias Pfeiffer [mailto:tgp@preferred.jp]
Sent: Friday, July 11, 2014 10:47 AM
To: user@spark.apache.org
Subject: Re: Some question about SQL and streaming

Hi,

On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai <sa...@intel.com>> wrote:
Actually we have a POC project which shows the power of combining Spark Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get SchemaDStream. You can take a look at it: https://github.com/thunderain-project/StreamSQL

Wow, that looks great! Any plans to get this code (or functionality) merged into Spark?

Tobias


Re: Some question about SQL and streaming

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai <sa...@intel.com>
wrote:

>  Actually we have a POC project which shows the power of combining Spark
> Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and
> get SchemaDStream. You can take a look at it:
> https://github.com/thunderain-project/StreamSQL
>

Wow, that looks great! Any plans to get this code (or functionality) merged
into Spark?

Tobias

RE: Some question about SQL and streaming

Posted by "Shao, Saisai" <sa...@intel.com>.
Actually we have a POC project which shows the power of combining Spark Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get SchemaDStream. You can take a look at it: https://github.com/thunderain-project/StreamSQL

Thanks
Jerry

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Friday, July 11, 2014 10:17 AM
To: user@spark.apache.org
Subject: Re: Some question about SQL and streaming

Yeah, the right solution is to have something like SchemaDStream, where the schema of all the schemaRDD generated by it can be stored. Something I really would like to see happen in the future :)

TD

On Thu, Jul 10, 2014 at 6:37 PM, Tobias Pfeiffer <tg...@preferred.jp>> wrote:
Hi,

I think it would be great if we could do the string parsing only once and then just apply the transformation for each interval (reducing the processing overhead for short intervals).

Also, one issue with the approach above is that transform() has the following signature:

  def transform(transformFunc: RDD[T] => RDD[U]): DStream[U]

and therefore, in my example

val result = lines.transform((rdd, time) => {
  // execute statement
  rdd.registerAsTable("data")
  sqlc.sql(query)
})

the variable `result ` is of type DStream[Row]. That is, the meta-information from the SchemaRDD is lost and, from what I understand, there is then no way to learn about the column names of the returned data, as this information is only encoded in the SchemaRDD. I would love to see a fix for this.

Thanks
Tobias




Re: Some question about SQL and streaming

Posted by Tathagata Das <ta...@gmail.com>.
Yeah, the right solution is to have something like SchemaDStream, where the
schema of all the schemaRDD generated by it can be stored. Something I
really would like to see happen in the future :)

TD


On Thu, Jul 10, 2014 at 6:37 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> I think it would be great if we could do the string parsing only once and
> then just apply the transformation for each interval (reducing the
> processing overhead for short intervals).
>
> Also, one issue with the approach above is that transform() has the
> following signature:
>
>   def transform(transformFunc: RDD[T] => RDD[U]): DStream[U]
>
> and therefore, in my example
>
> val result = lines.transform((rdd, time) => {
>>>   // execute statement
>>>   rdd.registerAsTable("data")
>>>   sqlc.sql(query)
>>> })
>>>
>>
> the variable `result ` is of type DStream[Row]. That is, the
> meta-information from the SchemaRDD is lost and, from what I understand,
> there is then no way to learn about the column names of the returned data,
> as this information is only encoded in the SchemaRDD. I would love to see a
> fix for this.
>
> Thanks
> Tobias
>
>
>

Re: Some question about SQL and streaming

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

I think it would be great if we could do the string parsing only once and
then just apply the transformation for each interval (reducing the
processing overhead for short intervals).

Also, one issue with the approach above is that transform() has the
following signature:

  def transform(transformFunc: RDD[T] => RDD[U]): DStream[U]

and therefore, in my example

val result = lines.transform((rdd, time) => {
>>   // execute statement
>>   rdd.registerAsTable("data")
>>   sqlc.sql(query)
>> })
>>
>
the variable `result ` is of type DStream[Row]. That is, the
meta-information from the SchemaRDD is lost and, from what I understand,
there is then no way to learn about the column names of the returned data,
as this information is only encoded in the SchemaRDD. I would love to see a
fix for this.

Thanks
Tobias

Re: Some question about SQL and streaming

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Yes, this is what I tried, but thanks!


On Wed, Jul 9, 2014 at 6:02 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Siyuan,
>
> I do it like this:
>
> // get data from Kafka
> val ssc = new StreamingContext(...)
> val kvPairs = KafkaUtils.createStream(...)
> // we need to wrap the data in a case class for registerAsTable() to
> succeed
> val lines = kvPairs.map(_._2).map(s => StringWrapper(s))
> val result = lines.transform((rdd, time) => {
>   // execute statement
>   rdd.registerAsTable("data")
>   sqlc.sql(query)
> })
>
> Don't know if it is the best way, but it works.
>
> Tobias
>
>
> On Thu, Jul 10, 2014 at 4:21 AM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
>
>> Hi guys,
>>
>> I'm a new user to spark. I would like to know is there an example of how
>> to user spark SQL and spark streaming together? My use case is I want to do
>> some SQL on the input stream from kafka.
>> Thanks!
>>
>> Best,
>> Siyuan
>>
>
>

Re: Some question about SQL and streaming

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Siyuan,

I do it like this:

// get data from Kafka
val ssc = new StreamingContext(...)
val kvPairs = KafkaUtils.createStream(...)
// we need to wrap the data in a case class for registerAsTable() to succeed
val lines = kvPairs.map(_._2).map(s => StringWrapper(s))
val result = lines.transform((rdd, time) => {
  // execute statement
  rdd.registerAsTable("data")
  sqlc.sql(query)
})

Don't know if it is the best way, but it works.

Tobias


On Thu, Jul 10, 2014 at 4:21 AM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Hi guys,
>
> I'm a new user to spark. I would like to know is there an example of how
> to user spark SQL and spark streaming together? My use case is I want to do
> some SQL on the input stream from kafka.
> Thanks!
>
> Best,
> Siyuan
>