You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/21 14:47:55 UTC

[GitHub] [hudi] giannisp-verneek opened a new issue, #5386: [SUPPORT] Kafka - Delta streamer - Spark read fails

giannisp-verneek opened a new issue, #5386:
URL: https://github.com/apache/hudi/issues/5386

   I have a pipeline where I send simple messages to a Kafka topic, write them via HudiDeltaStreamer to a Hudi table and finally read it through Spark. I first send a set of messages, and read them with Spark as a Spark dataframe. I see the expected output. I send a new set of messages that update the same keys. Although the delta streamer receives the messages I cannot read the hudi table incrementally through Spark. The message's schema does not include a date, but I am getting an error for `creteDateRebaseFuncInWrite`. I am using a VM from Oracle's cloud to run this small pipeline:
   
   ```
   Oracle Linux Server release 8.5
   NAME="Oracle Linux Server"
   VERSION="8.5"
   ID="ol"
   ID_LIKE="fedora"
   VARIANT="Server"
   VARIANT_ID="server"
   VERSION_ID="8.5"
   PLATFORM_ID="platform:el8"
   PRETTY_NAME="Oracle Linux Server 8.5"
   ANSI_COLOR="0;31"
   CPE_NAME="cpe:/o:oracle:linux:8:5:server"
   HOME_URL="https://linux.oracle.com/"
   BUG_REPORT_URL="https://bugzilla.oracle.com/"
   
   ORACLE_BUGZILLA_PRODUCT="Oracle Linux 8"
   ORACLE_BUGZILLA_PRODUCT_VERSION=8.5
   ORACLE_SUPPORT_PRODUCT="Oracle Linux"
   ORACLE_SUPPORT_PRODUCT_VERSION=8.5
   Red Hat Enterprise Linux release 8.5 (Ootpa)
   Oracle Linux Server release 8.5
   ```
   
   I'm using the same properties and a very similar schema to the Docker Hudi streaming example
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Start a Kafka and create a topic
   2. Run HoodieDeltaStreamer:3. 
   ```
   spark-submit --jars /<abs_path?/hudi-utilities-bundle_2.12-0.10.1.jar \
                --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /<abs_path>/hudi-utilities-bundle_2.12-0.10.1.jar  \
                --spark-master <master_url>   \
                --table-type MERGE_ON_READ   \
                --source-class org.apache.hudi.utilities.sources.JsonKafkaSource   \
                --source-ordering-field ts   \
                --target-base-path /<abs_path>   \
                --target-table <name>  \
                --props /<abs_path>/kafka.properties   \
                --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider   \
                --disable-compaction \
                --continuous
   ```
   3.  Send a set of messages
   4. Read the table through Pyspark
   5. Update the same keys 
   6. Read the same table
   
   **Expected behavior**
   
   See a table with the updated values. 
   
   **Environment Description**
   
   * Hudi version : 0.10.1 Scala 2.12
   
   * Spark version : 3.1.3 Scala 2.12
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : Local
   
   * Running on Docker? (yes/no) : No
   * Kafka version: 3.1.0 Scala 2.12
   
   * Java version: openjdk version "11.0.14.1" 2022-02-08 LTS
   OpenJDK Runtime Environment 18.9 (build 11.0.14.1+1-LTS)
   OpenJDK 64-Bit Server VM 18.9 (build 11.0.14.1+1-LTS, mixed mode, sharing)
   
   **Stacktrace**
   
   ```
   22/04/21 14:35:38 INFO DAGScheduler: ResultStage 0 (showString at NativeMethodAccessorImpl.java:0) failed in 1.711 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.0.0.91 executor 0): java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63)
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28)
   	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206)
   	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
   	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   
   Driver stacktrace:
   22/04/21 14:35:38 INFO DAGScheduler: Job 0 failed: showString at NativeMethodAccessorImpl.java:0, took 1.741374 s
   Traceback (most recent call last):
     File "hudi_read_test.py", line 27, in <module>
       new_df.show()
     File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 484, in show
     File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
     File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
     File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o46.showString.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.0.0.91 executor 0): java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63)
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28)
   	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206)
   	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
   	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   
   Driver stacktrace:
   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
   	at scala.Option.foreach(Option.scala:407)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
   	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
   	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
   	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
   	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3709)
   	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2735)
   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
   	at org.apache.spark.sql.Dataset.head(Dataset.scala:2735)
   	at org.apache.spark.sql.Dataset.take(Dataset.scala:2942)
   	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:302)
   	at org.apache.spark.sql.Dataset.showString(Dataset.scala:339)
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   	at py4j.Gateway.invoke(Gateway.java:282)
   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
   	at py4j.GatewayConnection.run(GatewayConnection.java:238)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63)
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
   	at org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28)
   	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206)
   	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
   	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	... 1 more
   
   22/04/21 14:35:38 INFO SparkContext: Invoking stop() from shutdown hook
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] giannisp-verneek commented on issue #5386: [SUPPORT] Kafka - Delta streamer - Spark read fails

Posted by GitBox <gi...@apache.org>.
giannisp-verneek commented on issue #5386:
URL: https://github.com/apache/hudi/issues/5386#issuecomment-1110006891

   @codope, it worked. Thank you! I'm closing this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #5386: [SUPPORT] Kafka - Delta streamer - Spark read fails

Posted by GitBox <gi...@apache.org>.
codope commented on issue #5386:
URL: https://github.com/apache/hudi/issues/5386#issuecomment-1109737244

   @giannisp-verneek This looks to be related to https://github.com/apache/spark/commit/0f6cafe410f55ccc1d2106ff2b66efcbbc1b1c0b
   Notice how the function name in your stacktrace is different from the one in Spark v 3.1.3 DataSourceUtils. Could you please downgrade to 3.1.2 and try again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] giannisp-verneek closed issue #5386: [SUPPORT] Kafka - Delta streamer - Spark read fails

Posted by GitBox <gi...@apache.org>.
giannisp-verneek closed issue #5386: [SUPPORT] Kafka - Delta streamer - Spark read fails
URL: https://github.com/apache/hudi/issues/5386


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org