You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Peter Liu <pe...@gmail.com> on 2018/05/24 20:14:47 UTC

re: help with streaming batch interval question needed

 Hi there,

from my apache spark streaming website (see links below),

   - the batch-interval is set when a spark StreamingContext is constructed
   (see example (a) quoted below)
   - the StreamingContext is available in older and new Spark version
   (v1.6, v2.2 to v2.3.0) (see
   https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
   and https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html
   )
   - however, example (b) below  doesn't use StreamingContext, but
   StreamingSession object to setup a streaming flow;

What does the usage difference in (a) and (b) mean? I was wondering if this
would mean a different streaming approach ("traditional" streaming vs
structured streaming?

Basically I need to find a way to set the batch-interval in (b), similar as
in (a) below.

Would be great if someone can please share some insights here.

Thanks!

Peter

(a)
https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )

import org.apache.spark._import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)val
*ssc *= new StreamingContext(conf, Seconds(1))


(b)
( from databricks'
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
)

   val *spark *= SparkSession.builder()
        .appName(appName)
      .getOrCreate()
...

jsonOptions = { "timestampFormat": nestTimestampFormat }
parsed = *spark *\
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "nest-logs") \
  .load() \
  .select(from_json(col("value").cast("string"), schema,
jsonOptions).alias("parsed_value"))

Re: help with streaming batch interval question needed

Posted by Peter Liu <pe...@gmail.com>.
 Hi Jacek,

This is exact what i'm looking for. Thanks!!

Also thanks for the link. I just noticed that I can unfold the link of
trigger and see the examples in java and scala languages - what a general
help for a new comer :-)
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter
<https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_latest_api_scala_index.html-23org.apache.spark.sql.streaming.DataStreamWriter&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=m2sU8kZLTdkMnwBSeE0_Zas-dlFoPDb3AeWH4V62vRo&s=RYwav9pkXP6vR0vMTl8w1BFABs-EQPuJ-mY376ARQPA&e=>
def trigger(trigger: Trigger
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/Trigger.html>
): DataStreamWriter
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html>
[T]

Set the trigger for the stream query. The default value is ProcessingTime(0)
and it will run the query as fast as possible.

Scala Example:
df.writeStream.trigger(ProcessingTime("10 seconds"))

import scala.concurrent.duration._
df.writeStream.trigger(ProcessingTime(10.seconds))

Java Example:
df.writeStream().trigger(ProcessingTime.create("10 seconds"))

import java.util.concurrent.TimeUnit
df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))

Muchly appreciated!

Peter


On Fri, May 25, 2018 at 9:11 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Peter,
>
> > Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> That's trigger method on DataStreamWriter.
>
> http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.sql.streaming.DataStreamWriter
>
> import org.apache.spark.sql.streaming.Trigger
> df.writeStream.trigger(Trigger.ProcessingTime("1 second"))
>
> See http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#triggers
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Thu, May 24, 2018 at 10:14 PM, Peter Liu <pe...@gmail.com> wrote:
>
>> Hi there,
>>
>> from my apache spark streaming website (see links below),
>>
>>    - the batch-interval is set when a spark StreamingContext is
>>    constructed (see example (a) quoted below)
>>    - the StreamingContext is available in older and new Spark version
>>    (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>>    1.6.0/streaming-programming-guide.html
>>    <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
>>    and https://spark.apache.org/docs/2.3.0/streaming-programming-gu
>>    ide.html )
>>    - however, example (b) below  doesn't use StreamingContext, but
>>    StreamingSession object to setup a streaming flow;
>>
>> What does the usage difference in (a) and (b) mean? I was wondering if
>> this would mean a different streaming approach ("traditional" streaming vs
>> structured streaming?
>>
>> Basically I need to find a way to set the batch-interval in (b), similar
>> as in (a) below.
>>
>> Would be great if someone can please share some insights here.
>>
>> Thanks!
>>
>> Peter
>>
>> (a)
>> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>>
>> import org.apache.spark._import org.apache.spark.streaming._
>> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= new StreamingContext(conf, Seconds(1))
>>
>>
>> (b)
>> ( from databricks' https://databricks.com/blog/20
>> 17/04/26/processing-data-in-apache-kafka-with-structured-str
>> eaming-in-apache-spark-2-2.html)
>>
>>    val *spark *= SparkSession.builder()
>>         .appName(appName)
>>       .getOrCreate()
>> ...
>>
>> jsonOptions = { "timestampFormat": nestTimestampFormat }
>> parsed = *spark *\
>>   .readStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("subscribe", "nest-logs") \
>>   .load() \
>>   .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
>>
>>
>>
>>
>>
>

Re: help with streaming batch interval question needed

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi Peter,

> Basically I need to find a way to set the batch-interval in (b), similar
as in (a) below.

That's trigger method on DataStreamWriter.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

import org.apache.spark.sql.streaming.Trigger
df.writeStream.trigger(Trigger.ProcessingTime("1 second"))

See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, May 24, 2018 at 10:14 PM, Peter Liu <pe...@gmail.com> wrote:

> Hi there,
>
> from my apache spark streaming website (see links below),
>
>    - the batch-interval is set when a spark StreamingContext is
>    constructed (see example (a) quoted below)
>    - the StreamingContext is available in older and new Spark version
>    (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>    1.6.0/streaming-programming-guide.html
>    <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
>    and https://spark.apache.org/docs/2.3.0/streaming-programming-
>    guide.html )
>    - however, example (b) below  doesn't use StreamingContext, but
>    StreamingSession object to setup a streaming flow;
>
> What does the usage difference in (a) and (b) mean? I was wondering if
> this would mean a different streaming approach ("traditional" streaming vs
> structured streaming?
>
> Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> Would be great if someone can please share some insights here.
>
> Thanks!
>
> Peter
>
> (a)
> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>
> import org.apache.spark._import org.apache.spark.streaming._
> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= new StreamingContext(conf, Seconds(1))
>
>
> (b)
> ( from databricks' https://databricks.com/blog/
> 2017/04/26/processing-data-in-apache-kafka-with-structured-
> streaming-in-apache-spark-2-2.html)
>
>    val *spark *= SparkSession.builder()
>         .appName(appName)
>       .getOrCreate()
> ...
>
> jsonOptions = { "timestampFormat": nestTimestampFormat }
> parsed = *spark *\
>   .readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "nest-logs") \
>   .load() \
>   .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
>
>
>
>
>