You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by nishith agarwal <n3...@gmail.com> on 2019/10/01 03:52:43 UTC

Re: Kafka read exception when using HoodieDeltaStreamer

Gautam,

If the MSK version (which I'm assuming in AWS msk..) is compatible with
kafka-client 0.8, then it seems like this might be an authentication issue.
Some details in this post here :
https://stackoverflow.com/questions/48164748/kafka-java-io-eofexception-networkreceive-readfromreadablechannel
.

Thanks,
Nishith

On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <gn...@guardanthealth.com>
wrote:

>
>
> When executing a HoodieDeltaStreamer Job, We run into this below
> exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs,
> but believe it should be compatible with the MSK version of Kafka. Any
> pointers what the issue could be?
>
>
>
> Spark - 2.2.1
>
> Kafka – MSK 2.1.0
>
>
>
> 19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error:
>
> java.io.EOFException
>
>        at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
>
>        at
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
>
>        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
>
>        at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
>
>        at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>
>        at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111)
>
>        at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)
>
>        at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132)
>
>        at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365)
>
>        at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)
>
>        at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>        at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>
>        at org.apache.spark.streaming.kafka.KafkaCluster.org
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)
>
>        at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)
>
>        at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)
>
>        at
> org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)
>
>        at
> org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55)
>
>        at
> org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72)
>
>        at
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)
>
>        at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299)
>
>        at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218)
>
>        at
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123)
>
>        at
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
>
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>        at java.lang.reflect.Method.invoke(Method.java:498)
>
>        at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926)
>
>        at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204)
>
>        at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229)
>
>        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
>
>        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges --
> topic name is KAFKA_TEST
>
>
>
> Thanks,
>
> Gautam
>
>

Re: Kafka read exception when using HoodieDeltaStreamer

Posted by Vinoth Chandar <vi...@apache.org>.
Awesome!

On Wed, Oct 2, 2019 at 3:01 PM Gautam Nayak <gn...@guardanthealth.com>
wrote:

> Thanks Vinoth for the tip,We were able to fix the issue as our spark
> cluster(2.2.0) bundled both spark-streaming-kafka-0-8 and
> spark-streaming-kafka-0-10 jars. Getting rid of spark-streaming-kafka-0-10
> jars from the cluster resolved the ClasscastException.
>
>
> On Oct 1, 2019, at 10:25 AM, Vinoth Chandar <vinoth@apache.org<mailto:
> vinoth@apache.org>> wrote:
>
>
>
> [External Email]
>
>
> Thanks for the detailed notes. helps.
>
> Could you give a quick shot trying to override the version in a custom
> build ?
> Wondering if just upgrading Kafka would suffice for your scenario (without
> needing the 2.12 scala bundle)
>
> On Tue, Oct 1, 2019 at 10:14 AM Gautam Nayak <gnayak@guardanthealth.com
> <ma...@guardanthealth.com>>
> wrote:
>
> Thanks Nishith for the help.We were able to figure out that Kafka 0.8.2
> (clients/broker) doesn’t support SSL/TLS, So we created a non SSL/TLS AWS
> MSK cluster (Kafka 1.1.1 now) and ran the HoodieDeltaStreamer.We are
> getting the following exception.I am thinking, waiting for HUDI-238<
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HUDI-2D238&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=NI9T6O72WAF4llaaSXKrf4uPuZ6Lc4OMh5cNlpb8MT0&s=-1Lkibr7ZUVyx72JmkN1BrBnDbG1DtseYPt7Xyr56S8&e=
> > (which also has upgrade
> planned to Kafka 0.10) would be the right thing since all our existing
> Kafka clusters are already on 2.0+ . If there is a solution for this error,
> We would still go for it.
>
> Caused by: org.apache.spark.SparkException: Couldn't connect to leader for
> topic KAFKA_TEST 0: java.lang.ClassCastException:
> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
> at scala.util.Either.fold(Either.scala:98)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:167)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:159)
> at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:339)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> Gautam
>
>
> On Sep 30, 2019, at 8:52 PM, nishith agarwal <n3.nash29@gmail.com<mailto:
> n3.nash29@gmail.com><mailto:
> n3.nash29@gmail.com<ma...@gmail.com>>> wrote:
>
>
>
> [External Email]
>
>
> Gautam,
>
> If the MSK version (which I'm assuming in AWS msk..) is compatible with
> kafka-client 0.8, then it seems like this might be an authentication issue.
> Some details in this post here :
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_48164748_kafka-2Djava-2Dio-2Deofexception-2Dnetworkreceive-2Dreadfromreadablechannel&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=auJ6yPCx7QX0WTZJxh3Wvc87ogR0WTbDqAd0004Y-SE&s=snk_1UVibetwfXvUDFIwj0sArlwaH512t0q5u8hbWUo&e=
> .
>
> Thanks,
> Nishith
>
> On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <gnayak@guardanthealth.com
> <ma...@guardanthealth.com>
> <ma...@guardanthealth.com>>
> wrote:
>
>
>
> When executing a HoodieDeltaStreamer Job, We run into this below
> exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs,
> but believe it should be compatible with the MSK version of Kafka. Any
> pointers what the issue could be?
>
>
>
> Spark - 2.2.1
>
> Kafka – MSK 2.1.0
>
>
>
> 19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error:
>
> java.io.EOFException
>
>      at
>
>
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
>
>      at
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
>
>      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
>
>      at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
>
>      at
>
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>
>      at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111)
>
>      at
>
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)
>
>      at
>
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132)
>
>      at
>
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365)
>
>      at
>
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)
>
>      at
>
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>      at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>
>      at org.apache.spark.streaming.kafka.KafkaCluster.org
>
>
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)
>
>      at
>
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)
>
>      at
>
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)
>
>      at
>
>
> org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)
>
>      at
>
>
> org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55)
>
>      at
> org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72)
>
>      at
>
>
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)
>
>      at
>
>
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299)
>
>      at
>
>
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218)
>
>      at
>
>
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123)
>
>      at
>
>
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
>
>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>      at
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>      at
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>      at java.lang.reflect.Method.invoke(Method.java:498)
>
>      at
>
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926)
>
>      at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204)
>
>      at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229)
>
>      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
>
>      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges --
> topic name is KAFKA_TEST
>
>
>
> Thanks,
>
> Gautam
>
>

Re: Kafka read exception when using HoodieDeltaStreamer

Posted by Gautam Nayak <gn...@guardanthealth.com>.
Thanks Vinoth for the tip,We were able to fix the issue as our spark cluster(2.2.0) bundled both spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10 jars. Getting rid of spark-streaming-kafka-0-10 jars from the cluster resolved the ClasscastException.


On Oct 1, 2019, at 10:25 AM, Vinoth Chandar <vi...@apache.org>> wrote:



[External Email]


Thanks for the detailed notes. helps.

Could you give a quick shot trying to override the version in a custom
build ?
Wondering if just upgrading Kafka would suffice for your scenario (without
needing the 2.12 scala bundle)

On Tue, Oct 1, 2019 at 10:14 AM Gautam Nayak <gn...@guardanthealth.com>>
wrote:

Thanks Nishith for the help.We were able to figure out that Kafka 0.8.2
(clients/broker) doesn’t support SSL/TLS, So we created a non SSL/TLS AWS
MSK cluster (Kafka 1.1.1 now) and ran the HoodieDeltaStreamer.We are
getting the following exception.I am thinking, waiting for HUDI-238<
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HUDI-2D238&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=NI9T6O72WAF4llaaSXKrf4uPuZ6Lc4OMh5cNlpb8MT0&s=-1Lkibr7ZUVyx72JmkN1BrBnDbG1DtseYPt7Xyr56S8&e= > (which also has upgrade
planned to Kafka 0.10) would be the right thing since all our existing
Kafka clusters are already on 2.0+ . If there is a solution for this error,
We would still go for it.

Caused by: org.apache.spark.SparkException: Couldn't connect to leader for
topic KAFKA_TEST 0: java.lang.ClassCastException:
kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
at scala.util.Either.fold(Either.scala:98)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:167)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:159)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:339)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Gautam


On Sep 30, 2019, at 8:52 PM, nishith agarwal <n3...@gmail.com><mailto:
n3.nash29@gmail.com<ma...@gmail.com>>> wrote:



[External Email]


Gautam,

If the MSK version (which I'm assuming in AWS msk..) is compatible with
kafka-client 0.8, then it seems like this might be an authentication issue.
Some details in this post here :

https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_48164748_kafka-2Djava-2Dio-2Deofexception-2Dnetworkreceive-2Dreadfromreadablechannel&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=auJ6yPCx7QX0WTZJxh3Wvc87ogR0WTbDqAd0004Y-SE&s=snk_1UVibetwfXvUDFIwj0sArlwaH512t0q5u8hbWUo&e=
.

Thanks,
Nishith

On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <gn...@guardanthealth.com>
<ma...@guardanthealth.com>>
wrote:



When executing a HoodieDeltaStreamer Job, We run into this below
exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs,
but believe it should be compatible with the MSK version of Kafka. Any
pointers what the issue could be?



Spark - 2.2.1

Kafka – MSK 2.1.0



19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error:

java.io.EOFException

     at

org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)

     at
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)

     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)

     at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)

     at

kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)

     at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111)

     at

org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)

     at

org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132)

     at

org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365)

     at

org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)

     at

scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

     at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

     at org.apache.spark.streaming.kafka.KafkaCluster.org

$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)

     at

org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)

     at

org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)

     at

org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)

     at

org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55)

     at
org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72)

     at

org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)

     at

org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299)

     at

org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218)

     at

org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123)

     at

org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at

org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926)

     at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204)

     at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229)

     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)

     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges --
topic name is KAFKA_TEST



Thanks,

Gautam


Re: Kafka read exception when using HoodieDeltaStreamer

Posted by Vinoth Chandar <vi...@apache.org>.
Thanks for the detailed notes. helps.

Could you give a quick shot trying to override the version in a custom
build ?
Wondering if just upgrading Kafka would suffice for your scenario (without
needing the 2.12 scala bundle)

On Tue, Oct 1, 2019 at 10:14 AM Gautam Nayak <gn...@guardanthealth.com>
wrote:

> Thanks Nishith for the help.We were able to figure out that Kafka 0.8.2
> (clients/broker) doesn’t support SSL/TLS, So we created a non SSL/TLS AWS
> MSK cluster (Kafka 1.1.1 now) and ran the HoodieDeltaStreamer.We are
> getting the following exception.I am thinking, waiting for HUDI-238<
> https://issues.apache.org/jira/browse/HUDI-238> (which also has upgrade
> planned to Kafka 0.10) would be the right thing since all our existing
> Kafka clusters are already on 2.0+ . If there is a solution for this error,
> We would still go for it.
>
> Caused by: org.apache.spark.SparkException: Couldn't connect to leader for
> topic KAFKA_TEST 0: java.lang.ClassCastException:
> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
> at scala.util.Either.fold(Either.scala:98)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:167)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:159)
> at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:339)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> Gautam
>
>
> On Sep 30, 2019, at 8:52 PM, nishith agarwal <n3.nash29@gmail.com<mailto:
> n3.nash29@gmail.com>> wrote:
>
>
>
> [External Email]
>
>
> Gautam,
>
> If the MSK version (which I'm assuming in AWS msk..) is compatible with
> kafka-client 0.8, then it seems like this might be an authentication issue.
> Some details in this post here :
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_48164748_kafka-2Djava-2Dio-2Deofexception-2Dnetworkreceive-2Dreadfromreadablechannel&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=auJ6yPCx7QX0WTZJxh3Wvc87ogR0WTbDqAd0004Y-SE&s=snk_1UVibetwfXvUDFIwj0sArlwaH512t0q5u8hbWUo&e=
> .
>
> Thanks,
> Nishith
>
> On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <gnayak@guardanthealth.com
> <ma...@guardanthealth.com>>
> wrote:
>
>
>
> When executing a HoodieDeltaStreamer Job, We run into this below
> exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs,
> but believe it should be compatible with the MSK version of Kafka. Any
> pointers what the issue could be?
>
>
>
> Spark - 2.2.1
>
> Kafka – MSK 2.1.0
>
>
>
> 19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error:
>
> java.io.EOFException
>
>       at
>
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
>
>       at
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
>
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
>
>       at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
>
>       at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>
>       at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111)
>
>       at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)
>
>       at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132)
>
>       at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365)
>
>       at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)
>
>       at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>       at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>
>       at org.apache.spark.streaming.kafka.KafkaCluster.org
>
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)
>
>       at
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)
>
>       at
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)
>
>       at
>
> org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)
>
>       at
>
> org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55)
>
>       at
> org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72)
>
>       at
>
> org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)
>
>       at
>
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299)
>
>       at
>
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218)
>
>       at
>
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123)
>
>       at
>
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
>
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>       at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>       at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>       at java.lang.reflect.Method.invoke(Method.java:498)
>
>       at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926)
>
>       at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204)
>
>       at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229)
>
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
>
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges --
> topic name is KAFKA_TEST
>
>
>
> Thanks,
>
> Gautam
>
>
>
>

Re: Kafka read exception when using HoodieDeltaStreamer

Posted by Gautam Nayak <gn...@guardanthealth.com>.
Thanks Nishith for the help.We were able to figure out that Kafka 0.8.2 (clients/broker) doesn’t support SSL/TLS, So we created a non SSL/TLS AWS MSK cluster (Kafka 1.1.1 now) and ran the HoodieDeltaStreamer.We are getting the following exception.I am thinking, waiting for HUDI-238<https://issues.apache.org/jira/browse/HUDI-238> (which also has upgrade planned to Kafka 0.10) would be the right thing since all our existing Kafka clusters are already on 2.0+ . If there is a solution for this error, We would still go for it.

Caused by: org.apache.spark.SparkException: Couldn't connect to leader for topic KAFKA_TEST 0: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:167)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:159)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:339)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Gautam


On Sep 30, 2019, at 8:52 PM, nishith agarwal <n3...@gmail.com>> wrote:



[External Email]


Gautam,

If the MSK version (which I'm assuming in AWS msk..) is compatible with
kafka-client 0.8, then it seems like this might be an authentication issue.
Some details in this post here :
https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_48164748_kafka-2Djava-2Dio-2Deofexception-2Dnetworkreceive-2Dreadfromreadablechannel&d=DwIFaQ&c=d114LkLKutuOERAHmV60Up0wrdigk2ezXF3OLLFTSC0&r=Z_EOqTWkXAwThVl29e1qgMbtnRFInSWTgm2Si2AeXgA&m=auJ6yPCx7QX0WTZJxh3Wvc87ogR0WTbDqAd0004Y-SE&s=snk_1UVibetwfXvUDFIwj0sArlwaH512t0q5u8hbWUo&e=
.

Thanks,
Nishith

On Mon, Sep 30, 2019 at 12:39 PM Gautam Nayak <gn...@guardanthealth.com>>
wrote:



When executing a HoodieDeltaStreamer Job, We run into this below
exception. I see hudi-utilities-bundle packaged with kafka-0.8 client libs,
but believe it should be compatible with the MSK version of Kafka. Any
pointers what the issue could be?



Spark - 2.2.1

Kafka – MSK 2.1.0



19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error:

java.io.EOFException

      at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)

      at
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)

      at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)

      at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)

      at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)

      at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111)

      at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)

      at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132)

      at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365)

      at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)

      at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

      at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

      at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)

      at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)

      at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)

      at
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)

      at
org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55)

      at
org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72)

      at
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)

      at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299)

      at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218)

      at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123)

      at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)

      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

      at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

      at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

      at java.lang.reflect.Method.invoke(Method.java:498)

      at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926)

      at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204)

      at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229)

      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)

      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges --
topic name is KAFKA_TEST



Thanks,

Gautam