You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Piotr Nestorow (JIRA)" <ji...@apache.org> on 2017/03/10 14:55:04 UTC

[jira] [Created] (SPARK-19903) PySpark Kafka streaming query ouput append mode not possible

Piotr Nestorow created SPARK-19903:
--------------------------------------

             Summary: PySpark Kafka streaming query ouput append mode not possible
                 Key: SPARK-19903
                 URL: https://issues.apache.org/jira/browse/SPARK-19903
             Project: Spark
          Issue Type: Bug
          Components: PySpark, Structured Streaming
    Affects Versions: 2.1.0
         Environment: Ubuntu Linux
            Reporter: Piotr Nestorow


PySpark example reads a Kafka stream. There is watermarking set when handling the data window. The defined query uses output Append mode.

The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets'

The Python example:
-------------------------------------------------------------------------------
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    # Create DataSet representing the stream of input lines from kafka
    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array into multiple rows
    words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    )

    # Group the data by window and word and compute the count of each group
    windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
        window(words.timestamp, "30 seconds", "30 seconds"), words.word
        ).count()

    # Start running the query that prints the running counts to the console
    query = windowedCounts\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .option("truncate", "false")\
        .start()

    query.awaitTermination()
---------------------------------------------------------------------

The corresponding example in Zeppelin notebook:
---------------------------------------------------------------
%spark.pyspark

from pyspark.sql.functions import explode, split, window

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "words")\
    .load()\
    .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
    window(words.timestamp, "30 seconds", "30 seconds"), words.word
    ).count()

# Start running the query that prints the running counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('append')\
    .format('console')\
    .option("truncate", "false")\
    .start()

query.awaitTermination()
--------------------------------------------------------------------------------------

Note that the Scala version of the same example in Zeppelin notebook works fine:
----------------------------------------------------------------------------------------
import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._

// Create DataSet representing the stream of input lines from kafka
val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "words")
            .load()

// Split the lines into words, retaining timestamps
val words = lines
            .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
            .as[(String, Timestamp)]
            .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
            .toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group
val windowedCounts = words
            .withWatermark("timestamp", "30 seconds")
            .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
            .count()

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
            .outputMode("append")
            .format("console")
            .trigger(ProcessingTime("35 seconds"))
            .option("truncate", "false")
            .start()

query.awaitTermination()
-----------------------------------------------------------------------------------------




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org