You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Basil Hariri <Ba...@microsoft.com.INVALID> on 2018/08/17 22:48:18 UTC

Spark Kafka adapter questions

Hi all,

I work on Azure Event Hubs (Microsoft's PaaS offering similar to Apache Kafka) and am trying to get our new Kafka head<https://azure.microsoft.com/en-us/blog/azure-event-hubs-for-kafka-ecosystems-in-public-preview/> to play nice with Spark's Kafka adapter. The goal is for our Kafka endpoint to be completely compatible with Spark's Kafka adapter, but I'm running into some issues that I think are related to versioning. I've been trying to tinker with the kafka-0-10-sql<https://github.com/apache/spark/tree/master/external/kafka-0-10-sql> and kafka-0-10<https://github.com/apache/spark/tree/master/external/kafka-0-10-sql> adapters on Github and was wondering if someone could take a second to point me in the right direction with:


  1.  What is the difference between those two adapters? My hunch is that kafka-0-10-sql supports structured streaming while kafka-10-0 still uses Spark streaming, but I haven't found anything to verify that.
  2.  Event Hubs' Kafka endpoint only supports Kafka 1.0 and later, and the errors I get when trying to connect to Spark ("failed to send SSL close message" / broken pipe errors) have usually shown up when using Kafka v0.10 applications with our endpoint. I built from source after I saw that both libraries were updated for Kafka 2.0 support (late last week), but I'm still running into the same issues. Do Spark's Kafka adapters generally downgrade to Kafka v0.10 protocols? If not, is there any other reason to believe that a Kafka "broker" that doesn't support v0.10 protocols but supports v1.0+ would be incompatible with Spark's Kafka adapter?

Thanks in advance, please let me know if there's a different place I should be posting this

Sincerely,
Basil


Re: Spark Kafka adapter questions

Posted by Ted Yu <yu...@gmail.com>.
After a brief check, I found KAFKA-5649 where almost identical error was
reported.

There is also KAFKA-3702 which is related but currently open.

I will dig some more to see what I can find.

Cheers

On Mon, Aug 20, 2018 at 3:53 PM Basil Hariri <Ba...@microsoft.com>
wrote:

> I am pretty sure I got those changes with the jar I compiled (I pulled
> from master on 8/8 and it looks like SPARK-18057 was resolved on 8/3) but
> no luck, here is a copy-paste of the error I’m seeing. The semantics for
> Event Hubs’ Kafka head is highlighted for reference – we connect to port
> 9093 on a FQDN instead of port 9092 on a Kafka broker’s IP address, but I
> don’t think that should change anything.
>
>
>
>
>
> 18/08/20 22:29:13 INFO AbstractCoordinator: Discovered coordinator <broker
> FQDN>:9093 (id: 2147483647 rack: null) for group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0.
>
> 18/08/20 22:29:13 INFO AbstractCoordinator: (Re-)joining group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0
>
> 18/08/20 22:29:33 WARN SslTransportLayer: Failed to send SSL Close message
>
> java.io.IOException: Broken pipe
>
>                 at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>
>                 at
> sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>
>                 at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>
>                 at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>
>                 at
> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>
>                 at
> kafkashaded.org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.Selector.close(Selector.java:471)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348)
>
>                 at
> kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>
>                 at
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
>
>                 at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:214)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:303)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302)
>
>                 at
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>
>                 at org.apache.spark.sql.kafka010.KafkaOffsetReader.org
> $apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:301)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:270)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:211)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:212)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:207)
>
>                 at scala.Option.getOrElse(Option.scala:121)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:207)
>
>                 at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org
> $apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:82)
>
>                 at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org
> $apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:82)
>
>                 at
> org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:89)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:364)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358)
>
>                 at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:353)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:340)
>
>                 at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
>                 at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
>                 at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>                 at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>                 at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>                 at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:340)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:563)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:336)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:189)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
>
>                 at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:172)
>
>                 at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>
>                 at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
>
>                 at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)
>
> 18/08/20 22:29:33 INFO AbstractCoordinator: Marking the coordinator
> <broker FQDN>:9093 (id: 2147483647 rack: null) dead for group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0
>
> 18/08/20 22:29:34 INFO AbstractCoordinator: Discovered coordinator <broker
> FQDN>:9093 (id: 2147483647 rack: null) for group
> spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0.
>
> <repeat this error over and over until the job is terminated>
>
>
>
> Also, I’m not sure if it’s relevant but I am running on Databricks
> (currently working on running it on a local cluster to verify that it isn’t
> a Databricks issue). The only jars I’m using are the Spark-Kafka connector
> from github master on 8/8/18 and Kafka v2.0. Thanks so much for your help,
> let me know if there’s anything else I can provide
>
>
>
> Sincerely,
>
> Basil
>
>
>
> *From:* Ted Yu <yu...@gmail.com>
> *Sent:* Friday, August 17, 2018 4:20 PM
> *To:* Basil.Hariri@microsoft.com.invalid
> *Cc:* dev <de...@spark.apache.org>
> *Subject:* Re: Spark Kafka adapter questions
>
>
>
> If you have picked up all the changes for SPARK-18057, the Kafka “broker”
> supporting v1.0+ should be compatible with Spark's Kafka adapter.
>
>
>
> Can you post more details about the “failed to send SSL close message”
> errors ?
>
>
>
> (The default Kafka version is 2.0.0 in Spark Kafka adapter
> after SPARK-18057)
>
>
>
> Thanks
>
>
>
> On Fri, Aug 17, 2018 at 3:53 PM Basil Hariri <
> Basil.Hariri@microsoft.com.invalid> wrote:
>
> Hi all,
>
>
>
> I work on Azure Event Hubs (Microsoft’s PaaS offering similar to Apache
> Kafka) and am trying to get our new Kafka head
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fazure.microsoft.com%2Fen-us%2Fblog%2Fazure-event-hubs-for-kafka-ecosystems-in-public-preview%2F&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=kNuSO1yNNJzOOyg%2FDRlyv4ZKB568f%2FKKn0zCnWQDK0A%3D&reserved=0>
> to play nice with Spark’s Kafka adapter. The goal is for our Kafka endpoint
> to be completely compatible with Spark’s Kafka adapter, but I’m running
> into some issues that I think are related to versioning. I’ve been trying
> to tinker with the kafka-0-10-sql
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=s5BoYXcUhrVb5uaj3Y2soxjn8Zm3LFVOyGD8bwDZkkM%3D&reserved=0>
> and kafka-0-10
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547303703&sdata=5H9%2FFGxz1VsL0OfWx7mrsQU2cGIR7zB3VuMADZop9RE%3D&reserved=0>
> adapters on Github and was wondering if someone could take a second to
> point me in the right direction with:
>
>
>
>    1. What is the difference between those two adapters? My hunch is that
>    kafka-0-10-sql supports structured streaming while kafka-10-0 still uses
>    Spark streaming, but I haven’t found anything to verify that.
>    2. Event Hubs’ Kafka endpoint only supports Kafka 1.0 and later, and
>    the errors I get when trying to connect to Spark (“failed to send SSL close
>    message” / broken pipe errors) have usually shown up when using Kafka v0.10
>    applications with our endpoint. I built from source after I saw that both
>    libraries were updated for Kafka 2.0 support (late last week), but I’m
>    still running into the same issues. Do Spark’s Kafka adapters generally
>    downgrade to Kafka v0.10 protocols? If not, is there any other reason to
>    believe that a Kafka “broker” that doesn’t support v0.10 protocols but
>    supports v1.0+ would be incompatible with Spark’s Kafka adapter?
>
>
>
> Thanks in advance, please let me know if there’s a different place I
> should be posting this
>
>
>
> Sincerely,
>
> Basil
>
>
>
>

RE: Spark Kafka adapter questions

Posted by Basil Hariri <Ba...@microsoft.com.INVALID>.
I am pretty sure I got those changes with the jar I compiled (I pulled from master on 8/8 and it looks like SPARK-18057 was resolved on 8/3) but no luck, here is a copy-paste of the error I’m seeing. The semantics for Event Hubs’ Kafka head is highlighted for reference – we connect to port 9093 on a FQDN instead of port 9092 on a Kafka broker’s IP address, but I don’t think that should change anything.


18/08/20 22:29:13 INFO AbstractCoordinator: Discovered coordinator <broker FQDN>:9093 (id: 2147483647 rack: null) for group spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0.
18/08/20 22:29:13 INFO AbstractCoordinator: (Re-)joining group spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0
18/08/20 22:29:33 WARN SslTransportLayer: Failed to send SSL Close message
java.io.IOException: Broken pipe
                at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
                at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
                at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
                at sun.nio.ch.IOUtil.write(IOUtil.java:65)
                at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
                at kafkashaded.org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
                at kafkashaded.org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
                at kafkashaded.org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
                at kafkashaded.org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
                at kafkashaded.org.apache.kafka.common.network.Selector.close(Selector.java:471)
                at kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348)
                at kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:283)
                at kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
                at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
                at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
                at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
                at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
                at kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
                at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
                at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
                at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:214)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:212)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:303)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:302)
                at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:301)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:212)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:270)
                at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:211)
                at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:212)
                at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:207)
                at scala.Option.getOrElse(Option.scala:121)
                at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:207)
                at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:82)
                at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:82)
                at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:89)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:364)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6$$anonfun$apply$2.apply(MicroBatchExecution.scala:358)
                at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
                at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:353)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$6.apply(MicroBatchExecution.scala:340)
                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
                at scala.collection.AbstractTraversable.map(Traversable.scala:104)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:340)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:336)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:563)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:336)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:189)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
                at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
                at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:172)
                at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
                at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
                at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
                at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)
18/08/20 22:29:33 INFO AbstractCoordinator: Marking the coordinator <broker FQDN>:9093 (id: 2147483647 rack: null) dead for group spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0
18/08/20 22:29:34 INFO AbstractCoordinator: Discovered coordinator <broker FQDN>:9093 (id: 2147483647 rack: null) for group spark-kafka-source-1aa50598-99d1-4c53-a73c-fa6637a219b2--1338794993-driver-0.
<repeat this error over and over until the job is terminated>

Also, I’m not sure if it’s relevant but I am running on Databricks (currently working on running it on a local cluster to verify that it isn’t a Databricks issue). The only jars I’m using are the Spark-Kafka connector from github master on 8/8/18 and Kafka v2.0. Thanks so much for your help, let me know if there’s anything else I can provide

Sincerely,
Basil

From: Ted Yu <yu...@gmail.com>
Sent: Friday, August 17, 2018 4:20 PM
To: Basil.Hariri@microsoft.com.invalid
Cc: dev <de...@spark.apache.org>
Subject: Re: Spark Kafka adapter questions

If you have picked up all the changes for SPARK-18057, the Kafka “broker” supporting v1.0+ should be compatible with Spark's Kafka adapter.

Can you post more details about the “failed to send SSL close message” errors ?

(The default Kafka version is 2.0.0 in Spark Kafka adapter after SPARK-18057)

Thanks

On Fri, Aug 17, 2018 at 3:53 PM Basil Hariri <Ba...@microsoft.com.invalid>> wrote:
Hi all,

I work on Azure Event Hubs (Microsoft’s PaaS offering similar to Apache Kafka) and am trying to get our new Kafka head<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fazure.microsoft.com%2Fen-us%2Fblog%2Fazure-event-hubs-for-kafka-ecosystems-in-public-preview%2F&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=kNuSO1yNNJzOOyg%2FDRlyv4ZKB568f%2FKKn0zCnWQDK0A%3D&reserved=0> to play nice with Spark’s Kafka adapter. The goal is for our Kafka endpoint to be completely compatible with Spark’s Kafka adapter, but I’m running into some issues that I think are related to versioning. I’ve been trying to tinker with the kafka-0-10-sql<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547293693&sdata=s5BoYXcUhrVb5uaj3Y2soxjn8Zm3LFVOyGD8bwDZkkM%3D&reserved=0> and kafka-0-10<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Ftree%2Fmaster%2Fexternal%2Fkafka-0-10-sql&data=02%7C01%7CBasil.Hariri%40microsoft.com%7C9c2387763e53418d4b4e08d6049813a9%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636701448547303703&sdata=5H9%2FFGxz1VsL0OfWx7mrsQU2cGIR7zB3VuMADZop9RE%3D&reserved=0> adapters on Github and was wondering if someone could take a second to point me in the right direction with:


  1.  What is the difference between those two adapters? My hunch is that kafka-0-10-sql supports structured streaming while kafka-10-0 still uses Spark streaming, but I haven’t found anything to verify that.
  2.  Event Hubs’ Kafka endpoint only supports Kafka 1.0 and later, and the errors I get when trying to connect to Spark (“failed to send SSL close message” / broken pipe errors) have usually shown up when using Kafka v0.10 applications with our endpoint. I built from source after I saw that both libraries were updated for Kafka 2.0 support (late last week), but I’m still running into the same issues. Do Spark’s Kafka adapters generally downgrade to Kafka v0.10 protocols? If not, is there any other reason to believe that a Kafka “broker” that doesn’t support v0.10 protocols but supports v1.0+ would be incompatible with Spark’s Kafka adapter?

Thanks in advance, please let me know if there’s a different place I should be posting this

Sincerely,
Basil


Re: Spark Kafka adapter questions

Posted by Ted Yu <yu...@gmail.com>.
If you have picked up all the changes for SPARK-18057, the Kafka “broker”
supporting v1.0+ should be compatible with Spark's Kafka adapter.

Can you post more details about the “failed to send SSL close message”
errors ?

(The default Kafka version is 2.0.0 in Spark Kafka adapter after SPARK-18057
)

Thanks

On Fri, Aug 17, 2018 at 3:53 PM Basil Hariri
<Ba...@microsoft.com.invalid> wrote:

> Hi all,
>
>
>
> I work on Azure Event Hubs (Microsoft’s PaaS offering similar to Apache
> Kafka) and am trying to get our new Kafka head
> <https://azure.microsoft.com/en-us/blog/azure-event-hubs-for-kafka-ecosystems-in-public-preview/>
> to play nice with Spark’s Kafka adapter. The goal is for our Kafka endpoint
> to be completely compatible with Spark’s Kafka adapter, but I’m running
> into some issues that I think are related to versioning. I’ve been trying
> to tinker with the kafka-0-10-sql
> <https://github.com/apache/spark/tree/master/external/kafka-0-10-sql> and
> kafka-0-10
> <https://github.com/apache/spark/tree/master/external/kafka-0-10-sql>
> adapters on Github and was wondering if someone could take a second to
> point me in the right direction with:
>
>
>
>    1. What is the difference between those two adapters? My hunch is that
>    kafka-0-10-sql supports structured streaming while kafka-10-0 still uses
>    Spark streaming, but I haven’t found anything to verify that.
>    2. Event Hubs’ Kafka endpoint only supports Kafka 1.0 and later, and
>    the errors I get when trying to connect to Spark (“failed to send SSL close
>    message” / broken pipe errors) have usually shown up when using Kafka v0.10
>    applications with our endpoint. I built from source after I saw that both
>    libraries were updated for Kafka 2.0 support (late last week), but I’m
>    still running into the same issues. Do Spark’s Kafka adapters generally
>    downgrade to Kafka v0.10 protocols? If not, is there any other reason to
>    believe that a Kafka “broker” that doesn’t support v0.10 protocols but
>    supports v1.0+ would be incompatible with Spark’s Kafka adapter?
>
>
>
> Thanks in advance, please let me know if there’s a different place I
> should be posting this
>
>
>
> Sincerely,
>
> Basil
>
>
>