You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/07/02 17:06:11 UTC

[jira] [Assigned] (SPARK-16350) Complete output mode does not output updated aggregated value in Structured Streaming

     [ https://issues.apache.org/jira/browse/SPARK-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-16350:
------------------------------------

    Assignee:     (was: Apache Spark)

> Complete output mode does not output updated aggregated value in Structured Streaming
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-16350
>                 URL: https://issues.apache.org/jira/browse/SPARK-16350
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Streaming
>    Affects Versions: 2.0.0
>         Environment: - Spark @c553976
> {code}
> $ java -version
> java version "1.8.0_20"
> Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
> Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
> {code}
> {code}
> $ uname -a
> Darwin mymac.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep  1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64
> {code}
>            Reporter: Arnaud Bailly
>              Labels: streaming
>
> Given the following program :
> {code}
> // A simple Order <-> Items model
> case class Order (
>   orderid: Int,
>   customer: String
> )
> case class Item (
>   orderid: Int,
>   itemid: Int,
>   amount: Float
> )
> import spark.implicits._
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> // input data comes from CSV files
> val INPUT_DIR: String =""
> val FILE_EXTENSION: String=".tbl"
> val ORDERS_FILE : String = INPUT_DIR + "orders" + FILE_EXTENSION
> // Items are added as a stream so input is a directory, not a file
> val ITEM_DIR  : String = INPUT_DIR + "item"
> import org.apache.spark.sql.types._
> import org.apache.spark.sql._
> val ItemSchema = StructType (
>   StructField("orderid"        ,  IntegerType, false) ::
>     StructField("itemid"      ,  IntegerType, true)   ::
>     StructField("amount"     ,  FloatType, true)      ::
>     Nil)
> val OrderSchema = StructType (
>   StructField("orderid"  , IntegerType, false) ::
>     StructField("customer" , StringType, true) ::
>     Nil)
> val csvOptions = Map("sep"  -> "|")
> val orders = sqlContext.read.format("csv").schema(OrderSchema).options(csvOptions).load(ORDERS_FILE).as[Order]
> orders.registerTempTable("orders")
> val itemsStream = sqlContext.readStream.format("csv").schema(ItemSchema).options(csvOptions).csv(ITEM_DIR).as[Item]
> itemsStream.registerTempTable("itemsStream")
> val sum_of_items_per_customer_streamed =
>   sqlContext.sql("SELECT customer, sum(amount) from orders d, itemsStream s where d.orderid = s.orderid group by customer")
> // print each computed value
> val outwriter = new ForeachWriter[Row] {
>   def open(partitionId: Long, version: Long): Boolean = true
>   def process(value: Row): Unit = if (value != null) print(value)
>   def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) print(errorOrNull)
> }
> sum_of_items_per_customer_streamed.writeStream.outputMode("complete").foreach(outwriter).start
> {code}
> and the following data sets:
> - {{orders.tbl}}
> || orderid || customer ||
> |1|foo|
> |2|bar|
> |3|foo|
> - {{items1.tbl}}
> ||orderid||itemid||amount||
> |1|1|1.0|
> |1|2|1.0|
> |1|3|1.0|
> |2|1|1.0|
> |2|2|1.0|
> |3|1|1.0|
> - {{items2.tbl}}
> ||orderid||itemid||amount||
> |1|4|1.0|
> |2|5|1.0|
> When I do the following actions:
> - start bin/spark-shell
> - {{:load complete-bug.scala}}
> - {{cp items1.tbl item/}}
> - {{cp items2.tbl item/}} 
> *Then* the following results are printed in console:
> {code}
> [bar,2.0][foo,4.0]
> [bar,1.0][foo,1.0]
> {code}
> I would expect the following:
> {code}
> [bar,2.0][foo,4.0]
> [bar,3.0][foo,5.0]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org