You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Hoang Ngo (Jira)" <ji...@apache.org> on 2020/04/07 17:58:00 UTC

[jira] [Commented] (HUDI-105) DeltaStreamer Kafka Ingestion does not handle invalid offsets

    [ https://issues.apache.org/jira/browse/HUDI-105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077475#comment-17077475 ] 

Hoang Ngo commented on HUDI-105:
--------------------------------

Hi [~vinoth],

Do you know if this fix is applied in hudi 0.5.0? I have same problem here. This is my spark-submit

spark-submit --conf 'spark.jars=/usr/lib/hudi/hudi-hadoop-mr-bundle-0.5.0-incubating.jar,/usr/lib/hudi/hudi-spark-bundle-0.5.0-incubating.jar,/usr/lib/hudi/hudi-utilities-bundle-0.5.0-incubating.jar' \
 --num-executors 1 \
 --executor-memory 2g \
 --driver-memory 2g \
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle-*.jar` \
 --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
 --storage-type COPY_ON_WRITE \
 --target-base-path s3://mybucket/tmp/hudidata-delta2/ \
 --target-table hudidata-delta2 \
 --props s3://mybucket/tmp/emr/hudipoc/a.properties \
 --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

 

 

This is my trace:
Exception in thread "main" org.apache.spark.SparkException: Offsets not available on leader: OffsetRange(topic: 'mytopic', partition: 1, range: [0 -> 25447]),OffsetRange(topic: 'mytopic', partition: 2, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 3, range: [0 -> 42775]),OffsetRange(topic: 'mytopic', partition: 7, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 8, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 13, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 16, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 17, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 18, range: [0 -> 6494]),OffsetRange(topic: 'mytopic', partition: 20, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 21, range: [0 -> 115657])
at org.apache.spark.streaming.kafka.KafkaUtils$.org$apache$spark$streaming$kafka$KafkaUtils$$checkOffsets(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:254)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:250)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:250)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:339)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:334)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:334)
at org.apache.spark.streaming.kafka.KafkaUtils.createRDD(KafkaUtils.scala)
at org.apache.hudi.utilities.sources.AvroKafkaSource.toRDD(AvroKafkaSource.java:67)
at org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:61)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:71)
at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:292)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:214)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:120)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:292)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/04/07 17:45:12 INFO ShutdownHookManager: Shutdown hook called

> DeltaStreamer Kafka Ingestion does not handle invalid offsets
> -------------------------------------------------------------
>
>                 Key: HUDI-105
>                 URL: https://issues.apache.org/jira/browse/HUDI-105
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Usability, Utilities
>            Reporter: Vinoth Chandar
>            Assignee: Vinoth Chandar
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.5.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Reported here [https://github.com/apache/incubator-hudi/issues/643] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)