You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2016/07/07 17:43:10 UTC
[jira] [Resolved] (SPARK-16350) Complete output mode does not
output updated aggregated value in Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu resolved SPARK-16350.
----------------------------------
Resolution: Fixed
Assignee: Liwei Lin
Fix Version/s: 2.1.0
2.0.1
> Complete output mode does not output updated aggregated value in Structured Streaming
> -------------------------------------------------------------------------------------
>
> Key: SPARK-16350
> URL: https://issues.apache.org/jira/browse/SPARK-16350
> Project: Spark
> Issue Type: Bug
> Components: SQL, Streaming
> Affects Versions: 2.0.0
> Environment: - Spark @c553976
> {code}
> $ java -version
> java version "1.8.0_20"
> Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
> Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
> {code}
> {code}
> $ uname -a
> Darwin mymac.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64
> {code}
> Reporter: Arnaud Bailly
> Assignee: Liwei Lin
> Labels: streaming
> Fix For: 2.0.1, 2.1.0
>
>
> Given the following program :
> {code}
> // A simple Order <-> Items model
> case class Order (
> orderid: Int,
> customer: String
> )
> case class Item (
> orderid: Int,
> itemid: Int,
> amount: Float
> )
> import spark.implicits._
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> // input data comes from CSV files
> val INPUT_DIR: String =""
> val FILE_EXTENSION: String=".tbl"
> val ORDERS_FILE : String = INPUT_DIR + "orders" + FILE_EXTENSION
> // Items are added as a stream so input is a directory, not a file
> val ITEM_DIR : String = INPUT_DIR + "item"
> import org.apache.spark.sql.types._
> import org.apache.spark.sql._
> val ItemSchema = StructType (
> StructField("orderid" , IntegerType, false) ::
> StructField("itemid" , IntegerType, true) ::
> StructField("amount" , FloatType, true) ::
> Nil)
> val OrderSchema = StructType (
> StructField("orderid" , IntegerType, false) ::
> StructField("customer" , StringType, true) ::
> Nil)
> val csvOptions = Map("sep" -> "|")
> val orders = sqlContext.read.format("csv").schema(OrderSchema).options(csvOptions).load(ORDERS_FILE).as[Order]
> orders.registerTempTable("orders")
> val itemsStream = sqlContext.readStream.format("csv").schema(ItemSchema).options(csvOptions).csv(ITEM_DIR).as[Item]
> itemsStream.registerTempTable("itemsStream")
> val sum_of_items_per_customer_streamed =
> sqlContext.sql("SELECT customer, sum(amount) from orders d, itemsStream s where d.orderid = s.orderid group by customer")
> // print each computed value
> val outwriter = new ForeachWriter[Row] {
> def open(partitionId: Long, version: Long): Boolean = true
> def process(value: Row): Unit = if (value != null) print(value)
> def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) print(errorOrNull)
> }
> sum_of_items_per_customer_streamed.writeStream.outputMode("complete").foreach(outwriter).start
> {code}
> and the following data sets:
> - {{orders.tbl}}
> || orderid || customer ||
> |1|foo|
> |2|bar|
> |3|foo|
> - {{items1.tbl}}
> ||orderid||itemid||amount||
> |1|1|1.0|
> |1|2|1.0|
> |1|3|1.0|
> |2|1|1.0|
> |2|2|1.0|
> |3|1|1.0|
> - {{items2.tbl}}
> ||orderid||itemid||amount||
> |1|4|1.0|
> |2|5|1.0|
> When I do the following actions:
> - start bin/spark-shell
> - {{:load complete-bug.scala}}
> - {{cp items1.tbl item/}}
> - {{cp items2.tbl item/}}
> *Then* the following results are printed in console:
> {code}
> [bar,2.0][foo,4.0]
> [bar,1.0][foo,1.0]
> {code}
> I would expect the following:
> {code}
> [bar,2.0][foo,4.0]
> [bar,3.0][foo,5.0]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org