You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2018/11/26 09:31:00 UTC

[jira] [Commented] (SPARK-26167) No output created for aggregation query in append mode

    [ https://issues.apache.org/jira/browse/SPARK-26167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698674#comment-16698674 ] 

Jungtaek Lim commented on SPARK-26167:
--------------------------------------

I guess your case might be all the rows in memory stream are in single batch. If then when Spark reads batch 1, watermark is not yet reflected so no output. After batch 1, watermark is advanced according to the dataset, but there's no data so no batch is occurred afterward.

The issue is resolved in SPARK-24156 which is shipped in Spark 2.4.0. Below code is Scala converted code and it works with latest master.

{code}
import org.apache.spark.sql.{Column, Dataset, Encoders, Row}
import org.apache.spark.sql.functions
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.MemoryStream

val rl = (0 until 1000).map { idx =>
  val t = 1512164314L + idx * 5 * 60;
  s"$t,qwer"
}

val nameCol = "name";
val eventTimeCol = "eventTime";
val eventTimestampCol = "eventTimestamp";

//implicit val strEncoder = Encoders.STRING

val input: MemoryStream[String] = new MemoryStream[String](42, spark.sqlContext)
input.addData(rl.toSeq)

val stream: Dataset[Row] = input.toDF().selectExpr(
  "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
  "cast(split(value,'[,]')[1] as String) as " + nameCol)

println(s"isStreaming: ${stream.isStreaming}")

val eventTime: Column = functions.to_timestamp(col(eventTimestampCol))
val rowData: Dataset[Row] = stream.withColumn(eventTimeCol, eventTime)

val windowDuration = "24 hours"
val slideDuration = "15 minutes"

val sliding24h: Dataset[Row] = rowData
  .withWatermark(eventTimeCol, slideDuration)
  .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), col(nameCol)).count();

val query = sliding24h
  .writeStream
  .format("console")
  .option("truncate", false)
  .option("numRows", 1000)
  .outputMode(OutputMode.Append())
  .start()

query.awaitTermination()
{code}


> No output created for aggregation query in append mode
> ------------------------------------------------------
>
>                 Key: SPARK-26167
>                 URL: https://issues.apache.org/jira/browse/SPARK-26167
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: dejan miljkovic
>            Priority: Blocker
>
> For aggregation query in append mode not all outputs are produced for inputs with expired watermark. I have data in kafka that need to be reprocessed and results stored in S3. S3 works only with append mode. Problem is that only part of the data is written to S3. Code below illustrates the my approach.
> String windowDuration = "24 hours"; 
> String slideDuration = "15 minutes"; 
> Dataset<Row> sliding24h = rowData 
> {{.withWatermark(eventTimeCol, slideDuration) .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), col(nameCol))}}
> count(); 
>  
> sliding24h .writeStream() 
> .format("console") 
> .option("truncate", false) 
> .option("numRows", 1000) 
> {{.outputMode(OutputMode.Append()) }}
> .start() 
> {{.awaitTermination();}}
>  
> Below is the example that shows the behavior. Code produces only empty Batch 0 in Append mode. Data is aggregated in 24 hour windows with 15 minute slide. Input data covers 84 hours. I think that code should produce all aggregated results expect for the last 15 minute interval.
>  
> {color:#000080}public static void {color}main(String [] args) {color:#000080}throws {color}StreamingQueryException {
>  SparkSession spark = SparkSession.builder().master({color:#008000}"local[*]"{color}).getOrCreate();
> ArrayList<String> rl = {color:#000080}new {color}ArrayList<>();
>  {color:#000080}for {color}({color:#000080}int {color}i = {color:#0000ff}0{color}; i < {color:#0000ff}1000{color}; ++i) {
>  {color:#000080}long {color}t = {color:#0000ff}1512164314L {color}+ i * {color:#0000ff}5 {color}* {color:#0000ff}60{color};
>  rl.add(t + {color:#008000}",qwer"{color});
>  }
> String nameCol = {color:#008000}"name"{color};
>  String eventTimeCol = {color:#008000}"eventTime"{color};
>  String eventTimestampCol = {color:#008000}"eventTimestamp"{color};
> MemoryStream<String> input = {color:#000080}new {color}MemoryStream<>({color:#0000ff}42{color}, spark.sqlContext(), Encoders.STRING());
>  input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
>  Dataset<Row> stream = input.toDF().selectExpr(
>  {color:#008000}"cast(split(value,'[,]')[0] as long) as " {color}+ eventTimestampCol,
>  {color:#008000}"cast(split(value,'[,]')[1] as String) as " {color}+ nameCol);
> System.{color:#660e7a}out{color}.println({color:#008000}"isStreaming: " {color}+ stream.isStreaming());
> Column eventTime = functions.to_timestamp(col(eventTimestampCol));
>  Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
> String windowDuration = {color:#008000}"24 hours"{color};
>  String slideDuration = {color:#008000}"15 minutes"{color};
>  Dataset<Row> sliding24h = rowData
>  .withWatermark(eventTimeCol, slideDuration)
>  .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
>  col(nameCol)).count();
> sliding24h
>  .writeStream()
>  .format({color:#008000}"console"{color})
>  .option({color:#008000}"truncate"{color}, {color:#000080}false{color})
>  .option({color:#008000}"numRows"{color}, {color:#0000ff}1000{color})
>  .outputMode(OutputMode.Append())
>  {color:#808080}//.outputMode(OutputMode.Complete()){color} .start()
>  .awaitTermination();
>  }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org