You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Arnaud Bailly (JIRA)" <ji...@apache.org> on 2016/07/02 07:37:10 UTC

[jira] [Created] (SPARK-16350) Complete output mode does not output updated aggregated value in Structured Streaming

Arnaud Bailly created SPARK-16350:
-------------------------------------

             Summary: Complete output mode does not output updated aggregated value in Structured Streaming
                 Key: SPARK-16350
                 URL: https://issues.apache.org/jira/browse/SPARK-16350
             Project: Spark
          Issue Type: Bug
          Components: SQL, Streaming
    Affects Versions: 2.0.0
         Environment: - Spark @c553976

{code}
$ java -version
java version "1.8.0_20"
Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
{code}

{code}
$ uname -a
Darwin mymac.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep  1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64
{code}

            Reporter: Arnaud Bailly


Given the following program :

{code}
// A simple Order <-> Items model
case class Order (
  orderid: Int,
  customer: String
)

case class Item (
  orderid: Int,
  itemid: Int,
  amount: Float
)

import spark.implicits._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// input data comes from CSV files
val INPUT_DIR: String =""
val FILE_EXTENSION: String=".tbl"
val ORDERS_FILE : String = INPUT_DIR + "orders" + FILE_EXTENSION

// Items are added as a stream so input is a directory, not a file
val ITEM_DIR  : String = INPUT_DIR + "item"

import org.apache.spark.sql.types._
import org.apache.spark.sql._

val ItemSchema = StructType (
  StructField("orderid"        ,  IntegerType, false) ::
    StructField("itemid"      ,  IntegerType, true)   ::
    StructField("amount"     ,  FloatType, true)      ::
    Nil)

val OrderSchema = StructType (
  StructField("orderid"  , IntegerType, false) ::
    StructField("customer" , StringType, true) ::
    Nil)

val csvOptions = Map("sep"  -> "|")

val orders = sqlContext.read.format("csv").schema(OrderSchema).options(csvOptions).load(ORDERS_FILE).as[Order]
orders.registerTempTable("orders")

val itemsStream = sqlContext.readStream.format("csv").schema(ItemSchema).options(csvOptions).csv(ITEM_DIR).as[Item]
itemsStream.registerTempTable("itemsStream")

val sum_of_items_per_customer_streamed =
  sqlContext.sql("SELECT customer, sum(amount) from orders d, itemsStream s where d.orderid = s.orderid group by customer")

// print each computed value
val outwriter = new ForeachWriter[Row] {
  def open(partitionId: Long, version: Long): Boolean = true
  def process(value: Row): Unit = if (value != null) print(value)
  def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) print(errorOrNull)
}

sum_of_items_per_customer_streamed.writeStream.outputMode("complete").foreach(outwriter).start
{code}

and the following data sets:

- {{orders.tbl}}
|| orderid || customer ||
|1|foo|
|2|bar|
|3|foo|

- {{items1.tbl}}
||orderid||itemid||amount||
|1|1|1.0|
|1|2|1.0|
|1|3|1.0|
|2|1|1.0|
|2|2|1.0|
|3|1|1.0|

- {{items2.tbl}}
||orderid||itemid||amount||
|1|4|1.0|
|2|5|1.0|

When I do the following actions:

- start bin/spark-shell
- {{:load complete-bug.scala}}
- {{cp items1.tbl item/}}
- {{cp items2.tbl item/}} 

*Then* the following results are printed in console:

{code}
[bar,2.0][foo,4.0]
[bar,1.0][foo,1.0]
{code}

I would expect the following:

{code}
[bar,2.0][foo,4.0]
[bar,3.0][foo,5.0]
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org