You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Peter Liu <pe...@gmail.com> on 2018/05/24 20:14:47 UTC
re: help with streaming batch interval question needed
Hi there,
from my apache spark streaming website (see links below),
- the batch-interval is set when a spark StreamingContext is constructed
(see example (a) quoted below)
- the StreamingContext is available in older and new Spark version
(v1.6, v2.2 to v2.3.0) (see
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
and https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html
)
- however, example (b) below doesn't use StreamingContext, but
StreamingSession object to setup a streaming flow;
What does the usage difference in (a) and (b) mean? I was wondering if this
would mean a different streaming approach ("traditional" streaming vs
structured streaming?
Basically I need to find a way to set the batch-interval in (b), similar as
in (a) below.
Would be great if someone can please share some insights here.
Thanks!
Peter
(a)
https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
import org.apache.spark._import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)val
*ssc *= new StreamingContext(conf, Seconds(1))
(b)
( from databricks'
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
)
val *spark *= SparkSession.builder()
.appName(appName)
.getOrCreate()
...
jsonOptions = { "timestampFormat": nestTimestampFormat }
parsed = *spark *\
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "nest-logs") \
.load() \
.select(from_json(col("value").cast("string"), schema,
jsonOptions).alias("parsed_value"))
Re: help with streaming batch interval question needed
Posted by Peter Liu <pe...@gmail.com>.
Hi Jacek,
This is exact what i'm looking for. Thanks!!
Also thanks for the link. I just noticed that I can unfold the link of
trigger and see the examples in java and scala languages - what a general
help for a new comer :-)
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter
<https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_latest_api_scala_index.html-23org.apache.spark.sql.streaming.DataStreamWriter&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=m2sU8kZLTdkMnwBSeE0_Zas-dlFoPDb3AeWH4V62vRo&s=RYwav9pkXP6vR0vMTl8w1BFABs-EQPuJ-mY376ARQPA&e=>
def trigger(trigger: Trigger
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/Trigger.html>
): DataStreamWriter
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html>
[T]
Set the trigger for the stream query. The default value is ProcessingTime(0)
and it will run the query as fast as possible.
Scala Example:
df.writeStream.trigger(ProcessingTime("10 seconds"))
import scala.concurrent.duration._
df.writeStream.trigger(ProcessingTime(10.seconds))
Java Example:
df.writeStream().trigger(ProcessingTime.create("10 seconds"))
import java.util.concurrent.TimeUnit
df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
Muchly appreciated!
Peter
On Fri, May 25, 2018 at 9:11 AM, Jacek Laskowski <ja...@japila.pl> wrote:
> Hi Peter,
>
> > Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> That's trigger method on DataStreamWriter.
>
> http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.sql.streaming.DataStreamWriter
>
> import org.apache.spark.sql.streaming.Trigger
> df.writeStream.trigger(Trigger.ProcessingTime("1 second"))
>
> See http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#triggers
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Thu, May 24, 2018 at 10:14 PM, Peter Liu <pe...@gmail.com> wrote:
>
>> Hi there,
>>
>> from my apache spark streaming website (see links below),
>>
>> - the batch-interval is set when a spark StreamingContext is
>> constructed (see example (a) quoted below)
>> - the StreamingContext is available in older and new Spark version
>> (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>> 1.6.0/streaming-programming-guide.html
>> <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
>> and https://spark.apache.org/docs/2.3.0/streaming-programming-gu
>> ide.html )
>> - however, example (b) below doesn't use StreamingContext, but
>> StreamingSession object to setup a streaming flow;
>>
>> What does the usage difference in (a) and (b) mean? I was wondering if
>> this would mean a different streaming approach ("traditional" streaming vs
>> structured streaming?
>>
>> Basically I need to find a way to set the batch-interval in (b), similar
>> as in (a) below.
>>
>> Would be great if someone can please share some insights here.
>>
>> Thanks!
>>
>> Peter
>>
>> (a)
>> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>>
>> import org.apache.spark._import org.apache.spark.streaming._
>> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= new StreamingContext(conf, Seconds(1))
>>
>>
>> (b)
>> ( from databricks' https://databricks.com/blog/20
>> 17/04/26/processing-data-in-apache-kafka-with-structured-str
>> eaming-in-apache-spark-2-2.html)
>>
>> val *spark *= SparkSession.builder()
>> .appName(appName)
>> .getOrCreate()
>> ...
>>
>> jsonOptions = { "timestampFormat": nestTimestampFormat }
>> parsed = *spark *\
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("subscribe", "nest-logs") \
>> .load() \
>> .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
>>
>>
>>
>>
>>
>
Re: help with streaming batch interval question needed
Posted by Jacek Laskowski <ja...@japila.pl>.
Hi Peter,
> Basically I need to find a way to set the batch-interval in (b), similar
as in (a) below.
That's trigger method on DataStreamWriter.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.streaming.Trigger
df.writeStream.trigger(Trigger.ProcessingTime("1 second"))
See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski
On Thu, May 24, 2018 at 10:14 PM, Peter Liu <pe...@gmail.com> wrote:
> Hi there,
>
> from my apache spark streaming website (see links below),
>
> - the batch-interval is set when a spark StreamingContext is
> constructed (see example (a) quoted below)
> - the StreamingContext is available in older and new Spark version
> (v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
> 1.6.0/streaming-programming-guide.html
> <https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
> and https://spark.apache.org/docs/2.3.0/streaming-programming-
> guide.html )
> - however, example (b) below doesn't use StreamingContext, but
> StreamingSession object to setup a streaming flow;
>
> What does the usage difference in (a) and (b) mean? I was wondering if
> this would mean a different streaming approach ("traditional" streaming vs
> structured streaming?
>
> Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> Would be great if someone can please share some insights here.
>
> Thanks!
>
> Peter
>
> (a)
> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>
> import org.apache.spark._import org.apache.spark.streaming._
> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= new StreamingContext(conf, Seconds(1))
>
>
> (b)
> ( from databricks' https://databricks.com/blog/
> 2017/04/26/processing-data-in-apache-kafka-with-structured-
> streaming-in-apache-spark-2-2.html)
>
> val *spark *= SparkSession.builder()
> .appName(appName)
> .getOrCreate()
> ...
>
> jsonOptions = { "timestampFormat": nestTimestampFormat }
> parsed = *spark *\
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "nest-logs") \
> .load() \
> .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
>
>
>
>
>