You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Peter Liu <pe...@gmail.com> on 2018/04/30 21:08:10 UTC
re: sharing data via kafka broker using spark streaming/
AnalysisException on collect()
Hello there,
I have a quick question regarding how to share data (a small data
collection) between a kafka producer and consumer using spark streaming
(spark 2.2):
(A)
the data published by a kafka producer is received in order on the kafka
consumer side (see (a) copied below).
(B)
however, collect() or cache() on a streaming dataframe does not seem to be
supported (see links in (b) below): i got this:
Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
(C)
My question would be:
--- How can I use the collection data (on a streaming dataframe) arrived on
the consumer side, e.g convert it to an array of objects?
--- Maybe there's another quick way to use kafka for sharing static data
(instead of streaming) between two spark application services (without any
common spark context and session etc.)?
I have copied some code snippet in (c).
It seems to be a very simple use case scenario to share a global collection
between a spark producer and consumer. But I spent entire day to try
various options and go thru online resources such as
google-general/apache-spark/stackoverflow/cloudera/etc/etc.
Any help would be very much appreciated!
Thanks!
Peter
(a) streaming data (df) received on the consumer side (console sink):
root
|-- ad_id: string (nullable = true)
|-- campaign_id: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------+------------------------------------+-----------------------+
|ad_id |campaign_id
|timestamp |
+------------------------------------+------------------------------------+-----------------------+
|b5629b58-376e-462c-9e65-726184390c84|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27
14:35:45.475|
|64e93f73-15bb-478c-9f96-fd38f6a24da2|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27
14:35:45.475|
|05fa1349-fcb3-432e-9b58-2bb0559859a2|060810fd-0430-444f-808c-8a177613226a|2018-04-27
14:35:45.478|
|ae0a176e-236a-4d3a-acb9-141157e81568|42b68023-6a3a-4618-a54a-e6f71df26710|2018-04-27
14:35:45.484|
(b) online discussions on unsupported operations on streaming dataframe:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio...
https://stackoverflow.com/questions/42062092/why-does-using-cache-on-streaming-datasets-fail-with-analysisexception-queries
(c) code snippet:
OK:
val rawDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
.option("startingOffsets", "earliest")
.option("subscribe", Variables.CAMPAIGNS_TOPIC)
.load()
OK:
val mySchema = StructType(Array(
StructField("ad_id", StringType),
StructField("campaign_id", StringType)))
val campaignsDf2 = campaignsDf.select(from_json($"value",
mySchema).as("data"), $"timestamp")
.select("data.*", "timestamp")
OK:
campaignsDf2.writeStream
.format("console")
.option("truncate","false")
.trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once
since this is a onetime static data
.awaitTermination()
Exception:
val campaignsArrayRows = campaignsDf2.collect() //<==== not
supported ====> AnalysisException!