You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/13 23:42:36 UTC

[GitHub] pgandhi999 opened a new pull request #23778: [SPARK-24935][SQL] : Problem with Executing Hive UDF's from Spark 2.2 Onwards

pgandhi999 opened a new pull request #23778: [SPARK-24935][SQL] : Problem with Executing Hive UDF's from Spark 2.2 Onwards
URL: https://github.com/apache/spark/pull/23778
 
 
   A user of sketches library(https://github.com/DataSketches/sketches-hive) reported an issue with HLL Sketch Hive UDAF that seems to be a bug in Spark or Hive. Their code runs fine in 2.1 but has an issue from 2.2 onwards. For more details on the issue, you can refer to the discussion in the sketches-user list:
   https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/sketches-user/GmH4-OlHP9g/MW-J7Hg4BwAJ
   
    
   On further debugging, we figured out that from 2.2 onwards, Spark hive UDAF provides support for partial aggregation, and has removed the functionality that supported complete mode aggregation(Refer https://issues.apache.org/jira/browse/SPARK-19060 and https://issues.apache.org/jira/browse/SPARK-18186). Thus, instead of expecting update method to be called, merge method is called here (https://github.com/DataSketches/sketches-hive/blob/master/src/main/java/com/yahoo/sketches/hive/hll/SketchEvaluator.java#L56) which throws the exception as described in the forums above.
   
   
   ## What changes were proposed in this pull request?
   
   Created new abstract class HiveTypedImperativeAggregate which is a framework for hive related aggregation functions.
   
   Also, there seems to be a bug in SortBasedAggregator where it was calling merge on aggregate buffer without initializing them. Have fixed it in this PR.
   
   ## How was this patch tested?
   
   The steps to reproduce the above issue have been stated in the google group link posted above but will repeat them here for convenience:
   **1. Download the following three jars from the maven repository in https://datasketches.github.io/docs/downloads.html.**
    -sketches-core
    -sketches-hive
    -memory
   
   **2. Launch spark-shell by adding the above jars in the driver as well as executor classpath and run the following commands:**
   scala> def randId=scala.util.Random.nextInt(10000)+1
   
   scala> def randomStringFromCharList(length: Int, chars: Seq[Char]): String = {
        |     val sb = new StringBuilder
        |     for (i <- 1 to length) {
        |       val randomNum = util.Random.nextInt(chars.length)
        |       sb.append(chars(randomNum))
        |     }
        |     sb.toString
        |   }
   
   scala> def randomAlphaNumericString(length: Int): String = {
        |     val chars = ('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9')
        |     randomStringFromCharList(length, chars)
        |   }
   
   
   scala> val df = sc.parallelize(
        | Seq.fill(1000000){(randId,randomAlphaNumericString(64))}
        | ).toDF("id","value")
   
   scala> spark.sql("CREATE TEMPORARY FUNCTION data2sketch AS 'com.yahoo.sketches.hive.hll.DataToSketchUDAF'")
   
   scala> val nDf=df.groupBy("id").agg(expr("data2sketch(value, 21, 'HLL_4') as hll")).select(col("id"),col("hll"))
   nDf: org.apache.spark.sql.DataFrame = [id: int, hll: binary]
   
   scala> nDf.show(10,false)
   
   
   **3. You will see the following exception below:**
   
   [Stage 0:>                                                          (0 + 2) / 2]18/08/19 00:47:24 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, gsrd259n31.red.ygrid.yahoo.com, executor 2): java.lang.ClassCastException: com.yahoo.sketches.hive.hll.SketchState cannot be cast to com.yahoo.sketches.hive.hll.UnionState
   	at com.yahoo.sketches.hive.hll.SketchEvaluator.merge(SketchEvaluator.java:56)
   	at com.yahoo.sketches.hive.hll.DataToSketchUDAF$DataToSketchEvaluator.merge(DataToSketchUDAF.java:100)
   	at org.apache.spark.sql.hive.HiveUDAFFunction.merge(hiveUDFs.scala:420)
   	at org.apache.spark.sql.hive.HiveUDAFFunction.merge(hiveUDFs.scala:307)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.merge(interfaces.scala:541)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$2.apply(AggregationIterator.scala:174)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$2.apply(AggregationIterator.scala:174)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:188)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:182)
   	at org.apache.spark.sql.execution.aggregate.SortBasedAggregator$$anon$1.findNextSortedGroup(ObjectAggregationIterator.scala:275)
   	at org.apache.spark.sql.execution.aggregate.SortBasedAggregator$$anon$1.hasNext(ObjectAggregationIterator.scala:247)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.hasNext(ObjectAggregationIterator.scala:81)
   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
   	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.scheduler.Task.run(Task.scala:109)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   	at java.lang.Thread.run(Thread.java:745)
   
   **4. After the code changes in this PR, run the same test as above and it should work.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org