You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/09/11 22:46:12 UTC

[GitHub] [incubator-hudi] umehrot2 opened a new issue #869: Hudi Spark error when spark bundle jar is added to spark's classpath

umehrot2 opened a new issue #869: Hudi Spark error when spark bundle jar is added to spark's classpath
URL: https://github.com/apache/incubator-hudi/issues/869
 
 
   We run into runtime error while packaging Hudi to work on AWS EMR. The issue is reproducible on latest Spark 2.4.3, as well on previous versions like 2.3.0, 2.2.0 and 2.1.0.
   
   ## Reproduction Steps
   
   These are the steps we followed on AWS EMR, that can help reproduce the issue:
   - Launch an AWS EMR 5.26.0 cluster
   - SSH into the cluster, and drop `hudi-spark-bundle` and `databricks-avro` jars under `/usr/lib/spark/jars/`
   - Start spark shell using `spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"`
   - Run a sample code that takes in existing parquet data, and converts and saves it in hudi format:
   ```
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.HoodieDataSourceHelpers
   import org.apache.hudi.common.model.HoodieTableType
   import org.apache.spark.sql.SaveMode
   import java.util.UUID;
   
   var tableName = "xxx"
   var tablePath = "s3://<path-to-save-hudi-table>/" + tableName
   val generateUUID = udf(() => UUID.randomUUID().toString)
   val df0 = spark.read.format("parquet").load("s3://<existing-parquet-data-path>/")
   val df1 = df0.withColumn("record_key", generateUUID())
   df1.write.format("org.apache.hudi")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "record_key")
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "<partition_key>") 
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "<precombine_key>")
    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "<partition_key>")
    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
    .mode(SaveMode.Overwrite)
    .save(tablePath)
   ```
   
   ## Stack Trace
   
   ```
   Driver stacktrace:
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
     at scala.Option.foreach(Option.scala:257)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
     at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
     at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
     at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472)
     at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
     at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
     at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
     at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
     at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
     at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:136)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
     at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
     at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
     at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
     at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
     at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
     ... 49 elided
   Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
     at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
     at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2292)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
     at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
     at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2177)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
     at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
     at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
     at org.apache.spark.scheduler.Task.run(Task.scala:121)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   ```
   
   ## Findings on EMRs side
   
   - In EMR right now we are able to make Hudi work with Spark using one of the two ways only:
      - Pass `hudi-spark-bundle` and `databricks-avro` jars using `--jars` option while starting the spark-shell: `spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --jars s3://<path-to-jar>/hudi-spark-bundle-0.5.0-SNAPSHOT.jar`
      - Configure `spark.jars` property in to the jars `/../hudi-spark-bundle-0.5.0-SNAPSHOT.jar,/../spark-avro_2.11-4.0.0.jar`
      - However, it does not work if we add these jars to spark's classpath as we have mentioned above. And this issue happens no matter, which spark or scala version is used.
   - We tried shading `databricks-avro` inside the bundle jar, and also matching scala versions but it still does not work.
   - Several threads exist online where people have run into this error like https://stackoverflow.com/questions/39953245/how-to-fix-java-lang-classcastexception-cannot-assign-instance-of-scala-collect/39953805#39953805 and it appears to be a class loading issue thats happening when `HoodieRecord` is converted into `JavaRDD`.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services