You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/10 11:36:14 UTC

[GitHub] [hudi] sunke38 opened a new issue, #5550: [SUPPORT]

sunke38 opened a new issue, #5550:
URL: https://github.com/apache/hudi/issues/5550

   SparkHudi.scala
   
   I try to use spark-hudi to read data from kafka then write to hudi. but my code fail to submit to spark. It show 
   
   org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters; is abstract
   
   what is possible reason for this log? Do I miss some depandency?
   `
   
   object SparkHudi {
     val logger = Logger.getLogger(SparkHudi.getClass)
   
     def main(args: Array[String]): Unit = {
   
       val spark = SparkSession
         .builder
         .appName("SparkHudi")
         //.master("local[*]")
         .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         .config("spark.default.parallelism", 9)
         .config("spark.sql.shuffle.partitions", 9)
         .enableHiveSupport()
         .getOrCreate()
   
       // 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台
       spark.streams.addListener(new StreamingQueryListener() {
         override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
           println("Query started: " + queryStarted.id)
         }
         override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
           println("Query terminated: " + queryTerminated.id)
         }
         override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
           println("Query made progress: " + queryProgress.progress)
         }
       })
   
       // 定义kafka流
       val dataStreamReader = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "10.10.9.202:9092")
         .option("subscribe", "spark-test")
         .option("startingOffsets", "latest")
         .option("maxOffsetsPerTrigger", 100000)
         .option("failOnDataLoss", false)
   
       // 加载流数据,这里由于只是测试使用,直接读取kafka消息而不作其余处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。
       val df = dataStreamReader.load()
         .selectExpr(
           "topic as kafka_topic",
       "CAST(partition AS STRING) kafka_partition",
       "cast(timestamp as String) kafka_timestamp",
       "CAST(offset AS STRING) kafka_offset",
       "CAST(key AS STRING) kafka_key",
       "CAST(value AS STRING) kafka_value",
       "current_timestamp() current_time",
       )
       .selectExpr(
         "kafka_topic",
       "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
       "kafka_offset",
       "kafka_timestamp",
       "kafka_key",
       "kafka_value",
       "substr(current_time,1,10) partition_date")
   
       // 建立并启动query
       val query = df
         .writeStream
         .queryName("demo")
         .foreachBatch ( (batchDF: DataFrame, _: Long) => {
           batchDF.persist()
   
           println(LocalDateTime.now() + "start writing cow table")
           batchDF.write.format("org.apache.hudi")
             .option(TABLE_TYPE.key(), "COPY_ON_WRITE")
             .option(PRECOMBINE_FIELD.key(), "kafka_timestamp")
             // 以kafka分区和偏移量做为组合主键
             .option(RECORDKEY_FIELD.key(), "kafka_partition_offset")
             // 以当前日期做为分区
             .option(PARTITIONPATH_FIELD.key(), "partition_date")
             .option("hoodie.table.name", "copy_on_write_table")
             .option(HIVE_STYLE_PARTITIONING.key(), true)
             .mode(SaveMode.Append)
             .save("/tmp/sparkHudi/COPY_ON_WRITE")
   
           println(LocalDateTime.now() + "start writing mor table")
           batchDF.write.format("org.apache.hudi")
             .option(TABLE_TYPE.key(), "MERGE_ON_READ")
             //.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
             .option(PRECOMBINE_FIELD.key(), "kafka_timestamp")
             .option(RECORDKEY_FIELD.key(), "kafka_partition_offset")
             .option(PARTITIONPATH_FIELD.key(), "partition_date")
             .option("hoodie.table.name", "merge_on_read_table")
             .option(HIVE_STYLE_PARTITIONING.key(), true)
             .mode(SaveMode.Append)
             .save("/tmp/sparkHudi/MERGE_ON_READ")
   
           println(LocalDateTime.now() + "finish")
           batchDF.unpersist()
   
           return unbox(Unit)
         })
         .option("checkpointLocation", "/tmp/sparkHudi/checkpoint/")
         .start()
   
       query.awaitTermination()
     }
   }`
   
   Environment Description
   
   Hudi version : 0.11
   
   Spark version : 3.2.1
   
   Hadoop version : 3.2.2
   
   Storage (HDFS/S3/GCS..) : HDFS
   
   Running on Docker? (yes/no) : no
   
   
   **Stacktrace**
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   Project [kafka_topic#21, concat(kafka_partition#22, -, kafka_offset#24) AS kafka_partition_offset#35, kafka_offset#24, kafka_timestamp#23, kafka_key#25, kafka_value#26, substr(cast(current_time#27 as string), 1, 10) AS partition_date#36]
   +- Project [topic#9 AS kafka_topic#21, cast(partition#10 as string) AS kafka_partition#22, cast(timestamp#12 as string) AS kafka_timestamp#23, cast(offset#11L as string) AS kafka_offset#24, cast(key#7 as string) AS kafka_key#25, cast(value#8 as string) AS kafka_value#26, current_timestamp() AS current_time#27]
      +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1e23578c, KafkaV2[Subscribe[spark-test]]
   
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
   Caused by: java.lang.AbstractMethodError: Method org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters; is abstract
   	at org.apache.spark.sql.adapter.Spark3_2Adapter.getAvroSchemaConverters(Spark3_2Adapter.scala)
   	at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:150)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:241)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
   	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
   	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
   	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
   	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
   	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
   	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
   	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
   	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
   	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
   	at com.example.SparkHudi$.$anonfun$main$1(SparkHudi.scala:88)
   	at com.example.SparkHudi$.$anonfun$main$1$adapted(SparkHudi.scala:74)
   	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
   	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
   	... 1 more
   2022-05-10 11:01:47,576 INFO spark.SparkContext: Invoking stop() from shutdown hook
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1123172633

   if you are using hudi-spark bundle, can you let us know which bundle are you using. or if you can share the launch command, would be good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
xushiyan commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1131842671

   @sunke38 i saw a version mismatch: `org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2` this is for spark 3.1.2 but your main spark version is 3.2.1. can you fix it and see if problem resolves


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1126597938

   couple of fixes.
   1. remove org.apache.spark:spark-avro_2.12:3.2.1 from your --packages. 
   2. I see you are adding hudi-spark bundle twice. once with --packages and once w/ --jars. Can you just add it only once. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sunke38 commented on issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
sunke38 commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1126698070

   Still same problem
   
   spark-submit --jars /home/kadm/module/hudi-0.11/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.11.0.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:3.1.0 --master spark://hadoop000:7077 SparkHudi-1.0-SNAPSHOT-shaded.jar
   
   
   at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
   Caused by: java.lang.AbstractMethodError: Method org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters; is abstract
   	at org.apache.spark.sql.adapter.Spark3_2Adapter.getAvroSchemaConverters(Spark3_2Adapter.scala)
   	at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:150)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:241)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan closed issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
xushiyan closed issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi
URL: https://github.com/apache/hudi/issues/5550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sunke38 commented on issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
sunke38 commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1137251392

   @[xushiyan](https://github.com/xushiyan)
   thank you for your help I decide abandon this code to try do it by spark sql. Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5550: [SUPPORT]

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1123171790

   are you using hudi-utilities bundle? if yes, the one you find in maven central for scala12 is meant for spark3.1. If you are interested in hudi-utilities bundle for spark3.2, you may have to locally build the bundle and use it. Let me know if this is the case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sunke38 commented on issue #5550: [SUPPORT] Issues with Spark3_2Adapter while using spark streaming to write to hudi

Posted by GitBox <gi...@apache.org>.
sunke38 commented on issue #5550:
URL: https://github.com/apache/hudi/issues/5550#issuecomment-1126051339

   > 
   
   Hi [nsivabalan](https://github.com/nsivabalan)
   
   here is my submit command
   
   spark-submit --jars /home/kadm/module/hudi-0.11/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.11.0.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.spark:spark-avro_2.12:3.2.1,org.apache.kafka:kafka-clients:3.1.0,org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.0 --master spark://hadoop000:7077 SparkHudi-1.0-SNAPSHOT-shaded.jar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org