You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Piotr Nestorow (JIRA)" <ji...@apache.org> on 2017/03/10 14:55:04 UTC
[jira] [Created] (SPARK-19903) PySpark Kafka streaming query ouput
append mode not possible
Piotr Nestorow created SPARK-19903:
--------------------------------------
Summary: PySpark Kafka streaming query ouput append mode not possible
Key: SPARK-19903
URL: https://issues.apache.org/jira/browse/SPARK-19903
Project: Spark
Issue Type: Bug
Components: PySpark, Structured Streaming
Affects Versions: 2.1.0
Environment: Ubuntu Linux
Reporter: Piotr Nestorow
PySpark example reads a Kafka stream. There is watermarking set when handling the data window. The defined query uses output Append mode.
The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets'
The Python example:
-------------------------------------------------------------------------------
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()
# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()
query.awaitTermination()
---------------------------------------------------------------------
The corresponding example in Zeppelin notebook:
---------------------------------------------------------------
%spark.pyspark
from pyspark.sql.functions import explode, split, window
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "words")\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()
# Start running the query that prints the running counts to the console
query = windowedCounts\
.writeStream\
.outputMode('append')\
.format('console')\
.option("truncate", "false")\
.start()
query.awaitTermination()
--------------------------------------------------------------------------------------
Note that the Scala version of the same example in Zeppelin notebook works fine:
----------------------------------------------------------------------------------------
import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "words")
.load()
// Split the lines into words, retaining timestamps
val words = lines
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
.as[(String, Timestamp)]
.flatMap(line => line._1.split(" ").map(word => (word, line._2)))
.toDF("word", "timestamp")
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "30 seconds")
.groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
.count()
// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
.outputMode("append")
.format("console")
.trigger(ProcessingTime("35 seconds"))
.option("truncate", "false")
.start()
query.awaitTermination()
-----------------------------------------------------------------------------------------
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org