You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Prabhjot Singh Bharaj (JIRA)" <ji...@apache.org> on 2019/04/11 15:11:00 UTC
[jira] [Comment Edited] (SPARK-27409) Micro-batch support for Kafka
Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815491#comment-16815491 ]
Prabhjot Singh Bharaj edited comment on SPARK-27409 at 4/11/19 3:10 PM:
------------------------------------------------------------------------
{code:java}
In [5]: df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",)\
...: .option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()
2019-04-11 15:04:49.903 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9093]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-a944d4a9-0257-492a-88b4-bba9decebb28-876181411-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = non-existent
ssl.keystore.password = [hidden]
ssl.keystore.type = PKCS12
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/<redacted>/ in <module>()
----> 1 df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",).option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()
/<redacted>/python/pyspark/sql/streaming.pyc in load(self, path, format, schema, **options)
401 return self._df(self._jreader.load(path))
402 else:
--> 403 return self._df(self._jreader.load())
404
405 @since(2.0)
/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/<redacted>/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o147.load.
: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
... 23 more
Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
... 24 more{code}
Going through these logs, notice the calls to `createContinuousReader`. This seems like a bug or something thats not documented. My understanding was that it should go to the `createMicroBatchReader` but its not.
was (Author: pbharaj):
{code:java}
In [5]: df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",)\
...: .option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()
2019-04-11 15:04:49.903 INFO ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9093]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-a944d4a9-0257-492a-88b4-bba9decebb28-876181411-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = non-existent
ssl.keystore.password = [hidden]
ssl.keystore.type = PKCS12
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/<redacted>/ in <module>()
----> 1 df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",).option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load()
/<redacted>/python/pyspark/sql/streaming.pyc in load(self, path, format, schema, **options)
401 return self._df(self._jreader.load(path))
402 else:
--> 403 return self._df(self._jreader.load())
404
405 @since(2.0)
/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/<redacted>/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/<redacted>/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o147.load.
: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
... 23 more
Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
... 24 more{code}
Going through these logs, notice the calls to `createContinuousReader`. This seems like a bug or something thats not documented. My understanding was that it should go to the `createMicroBatchReader` but its not.
`
> Micro-batch support for Kafka Source in Spark 2.3
> -------------------------------------------------
>
> Key: SPARK-27409
> URL: https://issues.apache.org/jira/browse/SPARK-27409
> Project: Spark
> Issue Type: Question
> Components: Structured Streaming
> Affects Versions: 2.3.2
> Reporter: Prabhjot Singh Bharaj
> Priority: Major
>
> It seems with this change - [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in micro-batch mode but only in continuous mode. Is that understanding correct ?
> {code:java}
> E Py4JJavaError: An error occurred while calling o217.load.
> E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
> E at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
> E at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
> E at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E at java.lang.reflect.Method.invoke(Method.java:498)
> E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E at py4j.Gateway.invoke(Gateway.java:282)
> E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E at py4j.commands.CallCommand.execute(CallCommand.java:79)
> E at py4j.GatewayConnection.run(GatewayConnection.java:238)
> E at java.lang.Thread.run(Thread.java:748)
> E Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
> E at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
> E at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
> E at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
> E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
> E ... 19 more
> E Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
> E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
> E ... 23 more
> E Caused by: java.io.FileNotFoundException: non-existent (No such file or directory)
> E at java.io.FileInputStream.open0(Native Method)
> E at java.io.FileInputStream.open(FileInputStream.java:195)
> E at java.io.FileInputStream.<init>(FileInputStream.java:138)
> E at java.io.FileInputStream.<init>(FileInputStream.java:93)
> E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
> E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
> E at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
> E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
> E ... 24 more{code}
> When running a simple data stream loader for kafka without an SSL cert, it goes through this code block -
>
> {code:java}
> ...
> ...
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> ...
> ...{code}
>
> Note that I haven't selected `trigger=continuous...` when creating the dataframe, still the code is going through the continuous path. My understanding was that `continuous` is optional and not the default.
>
> Please clarify.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org