You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Raghav Kumar Gautam (JIRA)" <ji...@apache.org> on 2018/09/13 17:32:00 UTC

[jira] [Created] (SPARK-25424) Window duration and slide duration with negative values should fail fast

Raghav Kumar Gautam created SPARK-25424:
-------------------------------------------

             Summary: Window duration and slide duration with negative values should fail fast
                 Key: SPARK-25424
                 URL: https://issues.apache.org/jira/browse/SPARK-25424
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.1
            Reporter: Raghav Kumar Gautam
             Fix For: 2.3.2


In TimeWindow class window duration and slide duration is not be allowed to take negative values.

Currently this is enforced by catalyst. It can be enforced by constructor of TimeWindow allowing it to fail fast.

For e.g. the code below throws following error. Note that the error is produced at the time of count() call instead of window() call.
{code}
val df = spark.readStream
  .format("rate")
  .option("numPartitions", "2")
  .option("rowsPerSecond", "10")
  .load()
  .filter("value % 20 == 0")
  .withWatermark("timestamp", "10 seconds")
  .groupBy(window($"timestamp", "-10 seconds", "5 seconds"))
  .count()
{code}

Error:
{code}
cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data type mismatch: The window duration (-10000000) must be greater than 0.;;
'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, count(1) AS count#57L]
+- AnalysisBarrier
      +- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
         +- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
            +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]

org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data type mismatch: The window duration (-10000000) must be greater than 0.;;
'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, count(1) AS count#57L]
+- AnalysisBarrier
      +- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
         +- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
            +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
	at org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:66)
	at org.apache.spark.sql.RelationalGroupedDataset.count(RelationalGroupedDataset.scala:239)
	at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply$mcV$sp(HelloScalaTest.scala:39)
	at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
	at com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
	at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
	at com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$runTest(AbstractTest.scala:11)
	at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
	at com.hortonworks.qe.AbstractTest.runTest(AbstractTest.scala:11)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
	at com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$run(AbstractTest.scala:11)
	at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)
	at com.hortonworks.qe.AbstractTest.run(AbstractTest.scala:11)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org