You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/11/15 00:55:58 UTC

[jira] [Assigned] (SPARK-18440) Fix FileStreamSink with aggregation + watermark + append mode

     [ https://issues.apache.org/jira/browse/SPARK-18440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-18440:
------------------------------------

    Assignee: Apache Spark

> Fix FileStreamSink with aggregation + watermark + append mode
> -------------------------------------------------------------
>
>                 Key: SPARK-18440
>                 URL: https://issues.apache.org/jira/browse/SPARK-18440
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>            Reporter: Tathagata Das
>            Assignee: Apache Spark
>             Fix For: 2.1.0
>
>
> SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with 
> {code}
> 16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error
> java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds
> +- LocalRelation [timestamp#7]
> 	at scala.Predef$.assert(Predef.scala:170)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
> 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
> 	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
> 	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
> 	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
> 	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
> 	at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
> 	at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:232)
> 	at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:107)
> 	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:232)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:123)
> 	at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:78)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:440)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:227)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:215)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:215)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:678)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:214)
> 	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:210)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:144)
> Reporter completed abruptly with an exception after receiving event: TestFailed(Ordinal(0, 3),The code passed to failAfter did not complete within 10 seconds.,FileStreamSinkSuite,org.apache.spark.sql.streaming.FileStreamSinkSuite,Some(org.apache.spark.sql.streaming.FileStreamSinkSuite),FileStreamSink - aggregation + watermark + append mode,FileStreamSink - aggregation + watermark + append mode,Vector(),Some(org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to failAfter did not complete within 10 seconds.),Some(12819),Some(IndentedText(- FileStreamSink - aggregation + watermark + append mode,FileStreamSink - aggregation + watermark + append mode,0)),Some(SeeStackDepthException),Some(org.apache.spark.sql.streaming.FileStreamSinkSuite),None,pool-1-thread-1-ScalaTest-running-FileStreamSinkSuite,1479169397025).
> java.lang.InterruptedException
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1219)
> 	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
> 	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:338)
> 	at org.scalatest.DispatchReporter.apply(DispatchReporter.scala:291)
> 	at org.scalatest.tools.SbtDispatchReporter$$anonfun$apply$1.apply(SbtDispatchReporter.scala:23)
> 	at org.scalatest.tools.SbtDispatchReporter$$anonfun$apply$1.apply(SbtDispatchReporter.scala:23)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at org.scalatest.tools.SbtDispatchReporter.apply(SbtDispatchReporter.scala:23)
> 	at org.scalatest.tools.Framework$SbtReporter.apply(Framework.scala:1094)
> 	at org.scalatest.WrapperCatchReporter.doApply(CatchReporter.scala:70)
> 	at org.scalatest.CatchReporter$class.apply(CatchReporter.scala:36)
> 	at org.scalatest.WrapperCatchReporter.apply(CatchReporter.scala:63)
> 	at org.scalatest.Suite$.reportTestFailed(Suite.scala:2048)
> 	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:347)
> 	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
> 	at org.apache.spark.sql.streaming.FileStreamSinkSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FileStreamSinkSuite.scala:30)
> 	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
> 	at org.apache.spark.sql.streaming.FileStreamSinkSuite.runTest(FileStreamSinkSuite.scala:30)
> 	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
> 	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
> 	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
> 	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> 	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
> 	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
> 	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
> 	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
> 	at org.scalatest.Suite$class.run(Suite.scala:1424)
> 	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
> 	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
> 	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
> 	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
> 	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
> 	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
> 	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
> 	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
> 	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:31)
> 	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357)
> 	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502)
> 	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
> 	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
> {code}
> This JIRA is to fix this by threading the right query execution to the FileFormatWriter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org