You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Utkarsh Parekh <ut...@gmail.com> on 2022/02/01 01:27:03 UTC

Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Hello,

I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying
to create a simple streaming app with Apache Beam, where it reads data from
an Azure event hub and produces messages into another Azure event hub.

I'm creating and running spark jobs on Azure Databricks.

The problem is the consumer (uses SparkRunner) is not able to read data
from Event hub (queue). There is no activity and no errors on the Spark
cluster.

I would appreciate it if anyone could help to fix this issue.

Thank you

Utkarsh

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Alexey Romanenko <ar...@gmail.com>.
Thank you for quick answers, Utkarsh, but unfortunately, I don’t see the real cause of this right now. Seems like, it will require some remote debugging on your site to see what workers are actually doing.


> On 1 Feb 2022, at 22:59, Utkarsh Parekh <ut...@gmail.com> wrote:
> 
> If you tested earlier with the same stack, which version did you use?
> 
> Can you enable debug logs to check what’s happening there? So far the following warning was received from from log4j which I received from log4j on Databricks (no errors other than that).
> 
> Can you make sure that there is no issue with firewall or something? No I don't think so. Because it's working fine locally and databricks notebook.
> 
> Can you run this pipeline locally against a real Kafka server, not Azure Event Hub, to make sure that it works fine? Yes it's working fine with both Azure EventHub and Kafka
> 
> 
> org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
> at org.springframework.expression.spel.support.StandardTypeConverter.<init>(StandardTypeConverter.java:46)
> at org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
> at org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
> at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
> at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
> at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
> at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
> at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
> at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
> at org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
> at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
> at org.apache.spark.storage.BlockManager.org <http://org.apache.spark.storage.blockmanager.org/>$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
> at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
> at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
> at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
> at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
> at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.Task.run(Task.scala:93)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 
> On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Well, personally I didn’t test with this version, but it should be fine… 
> Can you enable debug logs to check what’s happening there? 
> Can you make sure that there is no issue with firewall or something? 
> Can you run this pipeline locally against a real Kafka server, not Azure Event Hub, to make sure that it works fine?
> Otherwise, it would need to debug remotely the worker process.
> 
>> On 1 Feb 2022, at 19:18, Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is something missing here?
>> 
>> <dependency>
>>     <groupId>org.apache.beam</groupId>
>>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>>     <version>2.35.0</version>
>> </dependency>
>> 
>> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>> Here it is 
>> 
>> <dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-clients</artifactId>
>>     <version>2.8.0</version>
>> </dependency>
>> 
>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Hmm, this is strange. Which version of Kafka client do you use while running it with Beam?
>> 
>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Alexey, 
>>> 
>>> First of all, thank you for the response! Yes I did have it in Consumer configuration and try to increase "session.timeout".
>>> 
>>> From consumer side so far I've following settings:
>>> props.put("sasl.mechanism", SASL_MECHANISM);
>>> props.put("security.protocol", SECURITY_PROTOCOL);
>>> props.put("sasl.jaas.config", saslJaasConfig);
>>> props.put("request.timeout.ms <http://request.timeout.ms/>", 60000);
>>> props.put("session.timeout.ms <http://session.timeout.ms/>", 60000);
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>> 
>>> It works fine using following code in Databricks Notebook. The problem has been occurring when I run it through Apache beam and KafkaIO (Just providing more context if that may help you to understand problem)
>>> 
>>> val df = spark.readStream
>>>     .format("kafka")
>>>     .option("subscribe", TOPIC)
>>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>>     .option("kafka.sasl.mechanism", "PLAIN")
>>>     .option("kafka.security.protocol", "SASL_SSL")
>>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>>     .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", "60000")
>>>     .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", "60000")
>>>     .option("failOnDataLoss", "false")
>>> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
>>> .option("startingOffsets", "latest")
>>>     .load()
>>> 
>>> Utkarsh
>>> 
>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Utkarsh,
>>> 
>>> Can it be related to this configuration problem?
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
>>> 
>>> Did you check timeout settings?
>>> 
>>> —
>>> Alexey	
>>> 
>>> 
>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying to create a simple streaming app with Apache Beam, where it reads data from an Azure event hub and produces messages into another Azure event hub. 
>>>> 
>>>> I'm creating and running spark jobs on Azure Databricks.
>>>> 
>>>> The problem is the consumer (uses SparkRunner) is not able to read data from Event hub (queue). There is no activity and no errors on the Spark cluster.
>>>> 
>>>> I would appreciate it if anyone could help to fix this issue.
>>>> 
>>>> Thank you
>>>> 
>>>> Utkarsh
>>> 
>> 
> 


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Utkarsh Parekh <ut...@gmail.com>.
And I also get this error occasionally when I execute a streaming pipeline
with a new cluster instead of an existing cluster.

https://issues.apache.org/jira/browse/BEAM-12032?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel

On Tue, Feb 1, 2022 at 1:59 PM Utkarsh Parekh <ut...@gmail.com>
wrote:

> If you tested earlier with the same stack, which version did you use?
>
> *Can you enable debug logs to check what’s happening there? *So far the
> following warning was received from from log4j which I received from log4j
> on Databricks (no errors other than that).
>
> *Can you make sure that there is no issue with firewall or something? *No
> I don't think so. Because it's working fine locally and databricks notebook.
>
> *Can you run this pipeline locally against a real Kafka server, not Azure
> Event Hub, to make sure that it works fine? *Yes it's working fine with
> both Azure EventHub and Kafka
>
>
>
> org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
> at
> org.springframework.expression.spel.support.StandardTypeConverter.<init>(StandardTypeConverter.java:46)
> at
> org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
> at
> org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
> at
> org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
> at
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
> at
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
> at
> org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
> at
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
> at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.Task.run(Task.scala:93)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Well, personally I didn’t test with this version, but it should be fine…
>> Can you enable debug logs to check what’s happening there?
>> Can you make sure that there is no issue with firewall or something?
>> Can you run this pipeline locally against a real Kafka server, not Azure
>> Event Hub, to make sure that it works fine?
>> Otherwise, it would need to debug remotely the worker process.
>>
>> On 1 Feb 2022, at 19:18, Utkarsh Parekh <ut...@gmail.com>
>> wrote:
>>
>> Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
>> Is something missing here?
>>
>> <dependency>
>>     <groupId>org.apache.beam</groupId>
>>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>>     <version>2.35.0</version>
>> </dependency>
>>
>>
>> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <ut...@gmail.com>
>> wrote:
>>
>>> Here it is
>>>
>>> <dependency>
>>>     <groupId>org.apache.kafka</groupId>
>>>     <artifactId>kafka-clients</artifactId>
>>>     <version>2.8.0</version>
>>> </dependency>
>>>
>>>
>>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Hmm, this is strange. Which version of Kafka client do you use while
>>>> running it with Beam?
>>>>
>>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <ut...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Alexey,
>>>>
>>>> First of all, thank you for the response! Yes I did have it in Consumer
>>>> configuration and try to increase "session.timeout".
>>>>
>>>> From consumer side so far I've following settings:
>>>>
>>>> props.put("sasl.mechanism", SASL_MECHANISM);
>>>> props.put("security.protocol", SECURITY_PROTOCOL);
>>>> props.put("sasl.jaas.config", saslJaasConfig);
>>>> props.put("request.timeout.ms", 60000);
>>>> props.put("session.timeout.ms", 60000);
>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>>>
>>>>
>>>> It works fine using following code in Databricks Notebook. The problem
>>>> has been occurring when I run it through Apache beam and KafkaIO (Just
>>>> providing more context if that may help you to understand problem)
>>>>
>>>> val df = spark.readStream
>>>>     .format("kafka")
>>>>     .option("subscribe", TOPIC)
>>>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>>>     .option("kafka.sasl.mechanism", "PLAIN")
>>>>     .option("kafka.security.protocol", "SASL_SSL")
>>>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>>>     .option("kafka.request.timeout.ms", "60000")
>>>>     .option("kafka.session.timeout.ms", "60000")
>>>>     .option("failOnDataLoss", "false")
>>>> //.option("kafka.group.id", "testsink")
>>>> .option("startingOffsets", "latest")
>>>>     .load()
>>>>
>>>> Utkarsh
>>>>
>>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> Hi Utkarsh,
>>>>>
>>>>> Can it be related to this configuration problem?
>>>>>
>>>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>>>
>>>>> Did you check timeout settings?
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>>
>>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <ut...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>>>> trying to create a simple streaming app with Apache Beam, where it reads
>>>>> data from an Azure event hub and produces messages into another Azure event
>>>>> hub.
>>>>>
>>>>> I'm creating and running spark jobs on Azure Databricks.
>>>>>
>>>>> The problem is the consumer (uses SparkRunner) is not able to read
>>>>> data from Event hub (queue). There is no activity and no errors on the
>>>>> Spark cluster.
>>>>>
>>>>> I would appreciate it if anyone could help to fix this issue.
>>>>>
>>>>> Thank you
>>>>>
>>>>> Utkarsh
>>>>>
>>>>>
>>>>>
>>>>
>>

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Utkarsh Parekh <ut...@gmail.com>.
If you tested earlier with the same stack, which version did you use?

*Can you enable debug logs to check what’s happening there? *So far the
following warning was received from from log4j which I received from log4j
on Databricks (no errors other than that).

*Can you make sure that there is no issue with firewall or something? *No I
don't think so. Because it's working fine locally and databricks notebook.

*Can you run this pipeline locally against a real Kafka server, not Azure
Event Hub, to make sure that it works fine? *Yes it's working fine with
both Azure EventHub and Kafka


org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
at
org.springframework.expression.spel.support.StandardTypeConverter.<init>(StandardTypeConverter.java:46)
at
org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
at
org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
at
org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
at
org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
at
org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
at
org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
at
org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at
org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
at
org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
at
org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
at
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
at
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
at
org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:93)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Well, personally I didn’t test with this version, but it should be fine…
> Can you enable debug logs to check what’s happening there?
> Can you make sure that there is no issue with firewall or something?
> Can you run this pipeline locally against a real Kafka server, not Azure
> Event Hub, to make sure that it works fine?
> Otherwise, it would need to debug remotely the worker process.
>
> On 1 Feb 2022, at 19:18, Utkarsh Parekh <ut...@gmail.com>
> wrote:
>
> Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
> Is something missing here?
>
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>     <version>2.35.0</version>
> </dependency>
>
>
> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <ut...@gmail.com>
> wrote:
>
>> Here it is
>>
>> <dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-clients</artifactId>
>>     <version>2.8.0</version>
>> </dependency>
>>
>>
>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <ar...@gmail.com>
>> wrote:
>>
>>> Hmm, this is strange. Which version of Kafka client do you use while
>>> running it with Beam?
>>>
>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <ut...@gmail.com>
>>> wrote:
>>>
>>> Hi Alexey,
>>>
>>> First of all, thank you for the response! Yes I did have it in Consumer
>>> configuration and try to increase "session.timeout".
>>>
>>> From consumer side so far I've following settings:
>>>
>>> props.put("sasl.mechanism", SASL_MECHANISM);
>>> props.put("security.protocol", SECURITY_PROTOCOL);
>>> props.put("sasl.jaas.config", saslJaasConfig);
>>> props.put("request.timeout.ms", 60000);
>>> props.put("session.timeout.ms", 60000);
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>>
>>>
>>> It works fine using following code in Databricks Notebook. The problem
>>> has been occurring when I run it through Apache beam and KafkaIO (Just
>>> providing more context if that may help you to understand problem)
>>>
>>> val df = spark.readStream
>>>     .format("kafka")
>>>     .option("subscribe", TOPIC)
>>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>>     .option("kafka.sasl.mechanism", "PLAIN")
>>>     .option("kafka.security.protocol", "SASL_SSL")
>>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>>     .option("kafka.request.timeout.ms", "60000")
>>>     .option("kafka.session.timeout.ms", "60000")
>>>     .option("failOnDataLoss", "false")
>>> //.option("kafka.group.id", "testsink")
>>> .option("startingOffsets", "latest")
>>>     .load()
>>>
>>> Utkarsh
>>>
>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Hi Utkarsh,
>>>>
>>>> Can it be related to this configuration problem?
>>>>
>>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>>
>>>> Did you check timeout settings?
>>>>
>>>> —
>>>> Alexey
>>>>
>>>>
>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <ut...@gmail.com>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>>> trying to create a simple streaming app with Apache Beam, where it reads
>>>> data from an Azure event hub and produces messages into another Azure event
>>>> hub.
>>>>
>>>> I'm creating and running spark jobs on Azure Databricks.
>>>>
>>>> The problem is the consumer (uses SparkRunner) is not able to read data
>>>> from Event hub (queue). There is no activity and no errors on the Spark
>>>> cluster.
>>>>
>>>> I would appreciate it if anyone could help to fix this issue.
>>>>
>>>> Thank you
>>>>
>>>> Utkarsh
>>>>
>>>>
>>>>
>>>
>

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Alexey Romanenko <ar...@gmail.com>.
Well, personally I didn’t test with this version, but it should be fine… 
Can you enable debug logs to check what’s happening there? 
Can you make sure that there is no issue with firewall or something? 
Can you run this pipeline locally against a real Kafka server, not Azure Event Hub, to make sure that it works fine?
Otherwise, it would need to debug remotely the worker process.

> On 1 Feb 2022, at 19:18, Utkarsh Parekh <ut...@gmail.com> wrote:
> 
> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is something missing here?
> 
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>     <version>2.35.0</version>
> </dependency>
> 
> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
> Here it is 
> 
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>2.8.0</version>
> </dependency>
> 
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Hmm, this is strange. Which version of Kafka client do you use while running it with Beam?
> 
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Alexey, 
>> 
>> First of all, thank you for the response! Yes I did have it in Consumer configuration and try to increase "session.timeout".
>> 
>> From consumer side so far I've following settings:
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms <http://request.timeout.ms/>", 60000);
>> props.put("session.timeout.ms <http://session.timeout.ms/>", 60000);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>> 
>> It works fine using following code in Databricks Notebook. The problem has been occurring when I run it through Apache beam and KafkaIO (Just providing more context if that may help you to understand problem)
>> 
>> val df = spark.readStream
>>     .format("kafka")
>>     .option("subscribe", TOPIC)
>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>     .option("kafka.sasl.mechanism", "PLAIN")
>>     .option("kafka.security.protocol", "SASL_SSL")
>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>     .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", "60000")
>>     .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", "60000")
>>     .option("failOnDataLoss", "false")
>> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
>> .option("startingOffsets", "latest")
>>     .load()
>> 
>> Utkarsh
>> 
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Hi Utkarsh,
>> 
>> Can it be related to this configuration problem?
>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
>> 
>> Did you check timeout settings?
>> 
>> —
>> Alexey	
>> 
>> 
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying to create a simple streaming app with Apache Beam, where it reads data from an Azure event hub and produces messages into another Azure event hub. 
>>> 
>>> I'm creating and running spark jobs on Azure Databricks.
>>> 
>>> The problem is the consumer (uses SparkRunner) is not able to read data from Event hub (queue). There is no activity and no errors on the Spark cluster.
>>> 
>>> I would appreciate it if anyone could help to fix this issue.
>>> 
>>> Thank you
>>> 
>>> Utkarsh
>> 
> 


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Utkarsh Parekh <ut...@gmail.com>.
Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
Is something missing here?

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>2.35.0</version>
</dependency>


On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <ut...@gmail.com>
wrote:

> Here it is
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>2.8.0</version>
> </dependency>
>
>
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Hmm, this is strange. Which version of Kafka client do you use while
>> running it with Beam?
>>
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <ut...@gmail.com>
>> wrote:
>>
>> Hi Alexey,
>>
>> First of all, thank you for the response! Yes I did have it in Consumer
>> configuration and try to increase "session.timeout".
>>
>> From consumer side so far I've following settings:
>>
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms", 60000);
>> props.put("session.timeout.ms", 60000);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>
>>
>> It works fine using following code in Databricks Notebook. The problem
>> has been occurring when I run it through Apache beam and KafkaIO (Just
>> providing more context if that may help you to understand problem)
>>
>> val df = spark.readStream
>>     .format("kafka")
>>     .option("subscribe", TOPIC)
>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>     .option("kafka.sasl.mechanism", "PLAIN")
>>     .option("kafka.security.protocol", "SASL_SSL")
>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>     .option("kafka.request.timeout.ms", "60000")
>>     .option("kafka.session.timeout.ms", "60000")
>>     .option("failOnDataLoss", "false")
>> //.option("kafka.group.id", "testsink")
>> .option("startingOffsets", "latest")
>>     .load()
>>
>> Utkarsh
>>
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <ar...@gmail.com>
>> wrote:
>>
>>> Hi Utkarsh,
>>>
>>> Can it be related to this configuration problem?
>>>
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>
>>> Did you check timeout settings?
>>>
>>> —
>>> Alexey
>>>
>>>
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <ut...@gmail.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>> trying to create a simple streaming app with Apache Beam, where it reads
>>> data from an Azure event hub and produces messages into another Azure event
>>> hub.
>>>
>>> I'm creating and running spark jobs on Azure Databricks.
>>>
>>> The problem is the consumer (uses SparkRunner) is not able to read data
>>> from Event hub (queue). There is no activity and no errors on the Spark
>>> cluster.
>>>
>>> I would appreciate it if anyone could help to fix this issue.
>>>
>>> Thank you
>>>
>>> Utkarsh
>>>
>>>
>>>
>>

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Utkarsh Parekh <ut...@gmail.com>.
Here it is

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>


On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hmm, this is strange. Which version of Kafka client do you use while
> running it with Beam?
>
> On 1 Feb 2022, at 16:56, Utkarsh Parekh <ut...@gmail.com>
> wrote:
>
> Hi Alexey,
>
> First of all, thank you for the response! Yes I did have it in Consumer
> configuration and try to increase "session.timeout".
>
> From consumer side so far I've following settings:
>
> props.put("sasl.mechanism", SASL_MECHANISM);
> props.put("security.protocol", SECURITY_PROTOCOL);
> props.put("sasl.jaas.config", saslJaasConfig);
> props.put("request.timeout.ms", 60000);
> props.put("session.timeout.ms", 60000);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>
>
> It works fine using following code in Databricks Notebook. The problem has
> been occurring when I run it through Apache beam and KafkaIO (Just
> providing more context if that may help you to understand problem)
>
> val df = spark.readStream
>     .format("kafka")
>     .option("subscribe", TOPIC)
>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>     .option("kafka.sasl.mechanism", "PLAIN")
>     .option("kafka.security.protocol", "SASL_SSL")
>     .option("kafka.sasl.jaas.config", EH_SASL)
>     .option("kafka.request.timeout.ms", "60000")
>     .option("kafka.session.timeout.ms", "60000")
>     .option("failOnDataLoss", "false")
> //.option("kafka.group.id", "testsink")
> .option("startingOffsets", "latest")
>     .load()
>
> Utkarsh
>
> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Hi Utkarsh,
>>
>> Can it be related to this configuration problem?
>>
>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>
>> Did you check timeout settings?
>>
>> —
>> Alexey
>>
>>
>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <ut...@gmail.com>
>> wrote:
>>
>> Hello,
>>
>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>> trying to create a simple streaming app with Apache Beam, where it reads
>> data from an Azure event hub and produces messages into another Azure event
>> hub.
>>
>> I'm creating and running spark jobs on Azure Databricks.
>>
>> The problem is the consumer (uses SparkRunner) is not able to read data
>> from Event hub (queue). There is no activity and no errors on the Spark
>> cluster.
>>
>> I would appreciate it if anyone could help to fix this issue.
>>
>> Thank you
>>
>> Utkarsh
>>
>>
>>
>

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Alexey Romanenko <ar...@gmail.com>.
Hmm, this is strange. Which version of Kafka client do you use while running it with Beam?

> On 1 Feb 2022, at 16:56, Utkarsh Parekh <ut...@gmail.com> wrote:
> 
> Hi Alexey, 
> 
> First of all, thank you for the response! Yes I did have it in Consumer configuration and try to increase "session.timeout".
> 
> From consumer side so far I've following settings:
> props.put("sasl.mechanism", SASL_MECHANISM);
> props.put("security.protocol", SECURITY_PROTOCOL);
> props.put("sasl.jaas.config", saslJaasConfig);
> props.put("request.timeout.ms <http://request.timeout.ms/>", 60000);
> props.put("session.timeout.ms <http://session.timeout.ms/>", 60000);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
> 
> It works fine using following code in Databricks Notebook. The problem has been occurring when I run it through Apache beam and KafkaIO (Just providing more context if that may help you to understand problem)
> 
> val df = spark.readStream
>     .format("kafka")
>     .option("subscribe", TOPIC)
>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>     .option("kafka.sasl.mechanism", "PLAIN")
>     .option("kafka.security.protocol", "SASL_SSL")
>     .option("kafka.sasl.jaas.config", EH_SASL)
>     .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", "60000")
>     .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", "60000")
>     .option("failOnDataLoss", "false")
> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
> .option("startingOffsets", "latest")
>     .load()
> 
> Utkarsh
> 
> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Hi Utkarsh,
> 
> Can it be related to this configuration problem?
> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
> 
> Did you check timeout settings?
> 
> —
> Alexey	
> 
> 
>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <utkarsh.s.parekh@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello,
>> 
>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying to create a simple streaming app with Apache Beam, where it reads data from an Azure event hub and produces messages into another Azure event hub. 
>> 
>> I'm creating and running spark jobs on Azure Databricks.
>> 
>> The problem is the consumer (uses SparkRunner) is not able to read data from Event hub (queue). There is no activity and no errors on the Spark cluster.
>> 
>> I would appreciate it if anyone could help to fix this issue.
>> 
>> Thank you
>> 
>> Utkarsh
> 


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Utkarsh Parekh <ut...@gmail.com>.
Hi Alexey,

First of all, thank you for the response! Yes I did have it in Consumer
configuration and try to increase "session.timeout".

From consumer side so far I've following settings:

props.put("sasl.mechanism", SASL_MECHANISM);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.jaas.config", saslJaasConfig);
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 60000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);


It works fine using following code in Databricks Notebook. The problem has
been occurring when I run it through Apache beam and KafkaIO (Just
providing more context if that may help you to understand problem)

val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "60000")
    .option("failOnDataLoss", "false")
//.option("kafka.group.id", "testsink")
.option("startingOffsets", "latest")
    .load()

Utkarsh

On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Utkarsh,
>
> Can it be related to this configuration problem?
>
> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>
> Did you check timeout settings?
>
> —
> Alexey
>
>
> On 1 Feb 2022, at 02:27, Utkarsh Parekh <ut...@gmail.com>
> wrote:
>
> Hello,
>
> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
> trying to create a simple streaming app with Apache Beam, where it reads
> data from an Azure event hub and produces messages into another Azure event
> hub.
>
> I'm creating and running spark jobs on Azure Databricks.
>
> The problem is the consumer (uses SparkRunner) is not able to read data
> from Event hub (queue). There is no activity and no errors on the Spark
> cluster.
>
> I would appreciate it if anyone could help to fix this issue.
>
> Thank you
>
> Utkarsh
>
>
>

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Utkarsh,

Can it be related to this configuration problem?
https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>

Did you check timeout settings?

—
Alexey	


> On 1 Feb 2022, at 02:27, Utkarsh Parekh <ut...@gmail.com> wrote:
> 
> Hello,
> 
> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying to create a simple streaming app with Apache Beam, where it reads data from an Azure event hub and produces messages into another Azure event hub. 
> 
> I'm creating and running spark jobs on Azure Databricks.
> 
> The problem is the consumer (uses SparkRunner) is not able to read data from Event hub (queue). There is no activity and no errors on the Spark cluster.
> 
> I would appreciate it if anyone could help to fix this issue.
> 
> Thank you
> 
> Utkarsh