You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2017/06/02 21:23:04 UTC
[jira] [Updated] (SPARK-19903) Watermark metadata is lost when
using resolved attributes
[ https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-19903:
-------------------------------------
Summary: Watermark metadata is lost when using resolved attributes (was: PySpark Kafka streaming query ouput append mode not possible)
> Watermark metadata is lost when using resolved attributes
> ---------------------------------------------------------
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Environment: Ubuntu Linux
> Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets'
> The Python example:
> -------------------------------------------------------------------------------
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --------------------------------------------------------------------------------------
> Note that the Scala version of the same example in Zeppelin notebook works fine:
> ----------------------------------------------------------------------------------------
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
> .toDF("word", "timestamp")
> // Group the data by window and word and compute the count of each group
> val windowedCounts = words
> .withWatermark("timestamp", "30 seconds")
> .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
> .count()
> // Start running the query that prints the windowed word counts to the console
> val query = windowedCounts.writeStream
> .outputMode("append")
> .format("console")
> .trigger(ProcessingTime("35 seconds"))
> .option("truncate", "false")
> .start()
> query.awaitTermination()
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org