You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Raghav Kumar Gautam (JIRA)" <ji...@apache.org> on 2018/09/13 17:32:00 UTC
[jira] [Created] (SPARK-25424) Window duration and slide duration
with negative values should fail fast
Raghav Kumar Gautam created SPARK-25424:
-------------------------------------------
Summary: Window duration and slide duration with negative values should fail fast
Key: SPARK-25424
URL: https://issues.apache.org/jira/browse/SPARK-25424
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.3.1
Reporter: Raghav Kumar Gautam
Fix For: 2.3.2
In TimeWindow class window duration and slide duration is not be allowed to take negative values.
Currently this is enforced by catalyst. It can be enforced by constructor of TimeWindow allowing it to fail fast.
For e.g. the code below throws following error. Note that the error is produced at the time of count() call instead of window() call.
{code}
val df = spark.readStream
.format("rate")
.option("numPartitions", "2")
.option("rowsPerSecond", "10")
.load()
.filter("value % 20 == 0")
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "-10 seconds", "5 seconds"))
.count()
{code}
Error:
{code}
cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data type mismatch: The window duration (-10000000) must be greater than 0.;;
'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, count(1) AS count#57L]
+- AnalysisBarrier
+- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
+- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]
org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data type mismatch: The window duration (-10000000) must be greater than 0.;;
'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, count(1) AS count#57L]
+- AnalysisBarrier
+- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
+- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:66)
at org.apache.spark.sql.RelationalGroupedDataset.count(RelationalGroupedDataset.scala:239)
at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply$mcV$sp(HelloScalaTest.scala:39)
at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$runTest(AbstractTest.scala:11)
at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
at com.hortonworks.qe.AbstractTest.runTest(AbstractTest.scala:11)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$run(AbstractTest.scala:11)
at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)
at com.hortonworks.qe.AbstractTest.run(AbstractTest.scala:11)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org