You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Harold Hotelling (Jira)" <ji...@apache.org> on 2022/10/05 21:36:00 UTC

[jira] [Created] (SPARK-40668) "Cannot use an UnspecifiedFrame" error for User Defined Aggregation Function over Window

Harold Hotelling created SPARK-40668:
----------------------------------------

             Summary: "Cannot use an UnspecifiedFrame" error for User Defined Aggregation Function over Window
                 Key: SPARK-40668
                 URL: https://issues.apache.org/jira/browse/SPARK-40668
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.1.1
            Reporter: Harold Hotelling


The exception from Spark says "Please file a bug report." so here I am.

Seen with Spark 3.1.1.327 on Scala 2.12. Haven't tried other versions.

Error stack trace:
{code:java}
 testTop3ByUser FAILED
    org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `who` unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.;
    'Aggregate [$anon$1(TestSparkTop3$$anon$1@56cd6f12, None, None, None, encodeusingserializer(input[0, java.lang.Object, true], false), decodeusingserializer(input[0, binary, true], scala.collection.mutable.PriorityQueue, false), encodeusingserializer(input[0, java.lang.Object, true], false), BinaryType, true, 0, 0) windowspecdefinition(who#2L, unspecifiedframe$()) AS $anon$1(encodeusingserializer(input[0, java.lang.Object, true], false) AS `value`, decodeusingserializer(input[0, binary, true], scala.collection.mutable.PriorityQueue, false), encodeusingserializer(input[0, java.lang.Object, true], false)) AS `top3` OVER (PARTITION BY who unspecifiedframe$())#15]
    +- LocalRelation [who#2L, what#3L]
        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:161)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:347)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:347)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:344)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:413)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:411)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:364)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:344)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:344)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:413)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:411)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:364)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:344)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:76)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:249)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:152)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:189)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:93)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:90)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:176)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3715)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1462)
        at TestSparkTop3.testTop3ByUser(TestSparkTop3.scala:54){code}
Sample unit test to reproduce:
{code:java}
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.{Aggregator, Window}
import org.scalatest.Matchers
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.Test

// A record that a user did something.
case class UserAction(who: Long, what: Long)

class TestSparkTop3 extends TestNGSuite with Matchers with TestSparkSession {
  @Test
  def testTop3ByUser(): Unit = {
    import spark.implicits._

    implicit val userActionEncoder: Encoder[UserAction] = Encoders.product[UserAction]

    val top3 = new Aggregator[UserAction, mutable.PriorityQueue[UserAction], List[UserAction]] {
      def limitSize(b: mutable.PriorityQueue[UserAction]): mutable.PriorityQueue[UserAction] = {
        while (b.size > 3) {
          b.dequeue()
        }
        b
      }

      override def zero: mutable.PriorityQueue[UserAction] = mutable.PriorityQueue.empty[UserAction](Ordering.by(_.what))

      override def reduce(b: mutable.PriorityQueue[UserAction], a: UserAction): mutable.PriorityQueue[UserAction] =
        limitSize(b += a)

      override def merge(b1: mutable.PriorityQueue[UserAction],
        b2: mutable.PriorityQueue[UserAction]): mutable.PriorityQueue[UserAction] = limitSize(b1 ++ b2)

      override def finish(reduction: mutable.PriorityQueue[UserAction]): List[UserAction] = reduction.toList

      override def bufferEncoder: Encoder[mutable.PriorityQueue[UserAction]] = Encoders.javaSerialization

      override def outputEncoder: Encoder[List[UserAction]] = Encoders.javaSerialization
    }

    val values = Seq(
      UserAction(1, 10),
      UserAction(1, 11),
      UserAction(1, 12),
      UserAction(1, 13),
      UserAction(2, 20)
    )

    val ds = spark.createDataset[UserAction](values)

    val result = ds.select(top3.toColumn.name("top3").over(Window.partitionBy("who")))
      .collect()
    println(result.map(_.toString).mkString(", "))
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org