You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Arnaud Bailly (JIRA)" <ji...@apache.org> on 2016/07/02 07:37:10 UTC
[jira] [Created] (SPARK-16350) Complete output mode does not output
updated aggregated value in Structured Streaming
Arnaud Bailly created SPARK-16350:
-------------------------------------
Summary: Complete output mode does not output updated aggregated value in Structured Streaming
Key: SPARK-16350
URL: https://issues.apache.org/jira/browse/SPARK-16350
Project: Spark
Issue Type: Bug
Components: SQL, Streaming
Affects Versions: 2.0.0
Environment: - Spark @c553976
{code}
$ java -version
java version "1.8.0_20"
Java(TM) SE Runtime Environment (build 1.8.0_20-b26)
Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
{code}
{code}
$ uname -a
Darwin mymac.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64
{code}
Reporter: Arnaud Bailly
Given the following program :
{code}
// A simple Order <-> Items model
case class Order (
orderid: Int,
customer: String
)
case class Item (
orderid: Int,
itemid: Int,
amount: Float
)
import spark.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// input data comes from CSV files
val INPUT_DIR: String =""
val FILE_EXTENSION: String=".tbl"
val ORDERS_FILE : String = INPUT_DIR + "orders" + FILE_EXTENSION
// Items are added as a stream so input is a directory, not a file
val ITEM_DIR : String = INPUT_DIR + "item"
import org.apache.spark.sql.types._
import org.apache.spark.sql._
val ItemSchema = StructType (
StructField("orderid" , IntegerType, false) ::
StructField("itemid" , IntegerType, true) ::
StructField("amount" , FloatType, true) ::
Nil)
val OrderSchema = StructType (
StructField("orderid" , IntegerType, false) ::
StructField("customer" , StringType, true) ::
Nil)
val csvOptions = Map("sep" -> "|")
val orders = sqlContext.read.format("csv").schema(OrderSchema).options(csvOptions).load(ORDERS_FILE).as[Order]
orders.registerTempTable("orders")
val itemsStream = sqlContext.readStream.format("csv").schema(ItemSchema).options(csvOptions).csv(ITEM_DIR).as[Item]
itemsStream.registerTempTable("itemsStream")
val sum_of_items_per_customer_streamed =
sqlContext.sql("SELECT customer, sum(amount) from orders d, itemsStream s where d.orderid = s.orderid group by customer")
// print each computed value
val outwriter = new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = true
def process(value: Row): Unit = if (value != null) print(value)
def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) print(errorOrNull)
}
sum_of_items_per_customer_streamed.writeStream.outputMode("complete").foreach(outwriter).start
{code}
and the following data sets:
- {{orders.tbl}}
|| orderid || customer ||
|1|foo|
|2|bar|
|3|foo|
- {{items1.tbl}}
||orderid||itemid||amount||
|1|1|1.0|
|1|2|1.0|
|1|3|1.0|
|2|1|1.0|
|2|2|1.0|
|3|1|1.0|
- {{items2.tbl}}
||orderid||itemid||amount||
|1|4|1.0|
|2|5|1.0|
When I do the following actions:
- start bin/spark-shell
- {{:load complete-bug.scala}}
- {{cp items1.tbl item/}}
- {{cp items2.tbl item/}}
*Then* the following results are printed in console:
{code}
[bar,2.0][foo,4.0]
[bar,1.0][foo,1.0]
{code}
I would expect the following:
{code}
[bar,2.0][foo,4.0]
[bar,3.0][foo,5.0]
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org