You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by spark receiver <sp...@gmail.com> on 2017/11/13 21:21:25 UTC
Reload some static data during struct streaming
Hi
I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).
So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming?
Snippet code as following :
// df_meta contains important information to join with the dataframe read from kafka
val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load()
df_meta.cache()
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", “x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
.option("subscribe", "rawdb.raw_data")
.option("failOnDataLoss", true)
.option("startingOffsets", "latest")
.load()
.select($"value".as[Array[Byte]])
.map(avroDeserialize(_))
.as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
.join(df_meta.as("b"), $"a.apmac" === $"b.apmac”)
df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
.distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
.option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
.start("T_CF_TABLE")
.awaitTermination()
Mason
Re: Reload some static data during struct streaming
Posted by spark receiver <sp...@gmail.com>.
I need it cached to improve throughput ,only hope it can be refreshed once a day not every batch.
> On Nov 13, 2017, at 4:49 PM, Burak Yavuz <br...@gmail.com> wrote:
>
> I think if you don't cache the jdbc table, then it should auto-refresh.
>
> On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <spark.receiver@gmail.com <ma...@gmail.com>> wrote:
> Hi
>
> I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).
>
> So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming?
>
> Snippet code as following :
> // df_meta contains important information to join with the dataframe read from kafka
> val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load()
> df_meta.cache()
> val df = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", “x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
> .option("subscribe", "rawdb.raw_data")
> .option("failOnDataLoss", true)
> .option("startingOffsets", "latest")
> .load()
> .select($"value".as[Array[Byte]])
> .map(avroDeserialize(_))
> .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
> .join(df_meta.as <http://df_meta.as/>("b"), $"a.apmac" === $"b.apmac”)
>
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
> .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
> .option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
> .start("T_CF_TABLE")
> .awaitTermination()
>
> Mason
>
Re: Reload some static data during struct streaming
Posted by spark receiver <sp...@gmail.com>.
I need it cached to improve throughput ,only hope it can be refreshed once a day not every batch.
> On Nov 13, 2017, at 4:49 PM, Burak Yavuz <br...@gmail.com> wrote:
>
> I think if you don't cache the jdbc table, then it should auto-refresh.
>
> On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <spark.receiver@gmail.com <ma...@gmail.com>> wrote:
> Hi
>
> I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).
>
> So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming?
>
> Snippet code as following :
> // df_meta contains important information to join with the dataframe read from kafka
> val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load()
> df_meta.cache()
> val df = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", “x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
> .option("subscribe", "rawdb.raw_data")
> .option("failOnDataLoss", true)
> .option("startingOffsets", "latest")
> .load()
> .select($"value".as[Array[Byte]])
> .map(avroDeserialize(_))
> .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
> .join(df_meta.as <http://df_meta.as/>("b"), $"a.apmac" === $"b.apmac”)
>
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
> .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
> .option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
> .start("T_CF_TABLE")
> .awaitTermination()
>
> Mason
>
Re: Reload some static data during struct streaming
Posted by Burak Yavuz <br...@gmail.com>.
I think if you don't cache the jdbc table, then it should auto-refresh.
On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <sp...@gmail.com>
wrote:
> Hi
>
> I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works
> great. The thing is I need to join the Kafka message with a relative static
> table stored in mysql database (let’s call it metadata here).
>
> So is it possible to reload the metadata table after some time
> interval(like daily ) without restart running struct streaming?
>
> Snippet code as following :
>
> // df_meta contains important information to join with the dataframe read from kafka
>
> val df_meta = spark.read.format("jdbc").option("url", mysql_url).option(
> "dbtable", "v_entity_ap_rel").load()
>
> df_meta.cache()
>
> val df = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", *“*x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
> .option("subscribe", "rawdb.raw_data")
> .option("failOnDataLoss", true)
> .option("startingOffsets", "latest")
> .load()
> .select($"value".as[Array[Byte]])
> .map(avroDeserialize(_))
> .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
> .join(df_meta.as("b"), $"a.apmac" === $"b.apmac*”*)
>
>
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
> .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
> .option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
> .start("T_CF_TABLE")
> .awaitTermination()
>
>
> Mason
>
Re: Reload some static data during struct streaming
Posted by Burak Yavuz <br...@gmail.com>.
I think if you don't cache the jdbc table, then it should auto-refresh.
On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <sp...@gmail.com>
wrote:
> Hi
>
> I’m using struct streaming(spark 2.2) to receive Kafka msg ,it works
> great. The thing is I need to join the Kafka message with a relative static
> table stored in mysql database (let’s call it metadata here).
>
> So is it possible to reload the metadata table after some time
> interval(like daily ) without restart running struct streaming?
>
> Snippet code as following :
>
> // df_meta contains important information to join with the dataframe read from kafka
>
> val df_meta = spark.read.format("jdbc").option("url", mysql_url).option(
> "dbtable", "v_entity_ap_rel").load()
>
> df_meta.cache()
>
> val df = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", *“*x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
> .option("subscribe", "rawdb.raw_data")
> .option("failOnDataLoss", true)
> .option("startingOffsets", "latest")
> .load()
> .select($"value".as[Array[Byte]])
> .map(avroDeserialize(_))
> .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
> .join(df_meta.as("b"), $"a.apmac" === $"b.apmac*”*)
>
>
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
> .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
> .option("checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
> .start("T_CF_TABLE")
> .awaitTermination()
>
>
> Mason
>