You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Suo L. (Jira)" <ji...@apache.org> on 2021/01/19 08:51:00 UTC
[jira] [Commented] (FLINK-19132) Failed to start jobs for consuming
Secure Kafka after cluster restart
[ https://issues.apache.org/jira/browse/FLINK-19132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267757#comment-17267757 ]
Suo L. commented on FLINK-19132:
--------------------------------
See the same issue, 1.10.2 + k8s, temporary solved by recreate a cluster.
{noformat}
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/security/scram/internals/ScramSaslClient
at org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:511)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1019)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:463)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:458)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:778)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:748)
{noformat}
> Failed to start jobs for consuming Secure Kafka after cluster restart
> ---------------------------------------------------------------------
>
> Key: FLINK-19132
> URL: https://issues.apache.org/jira/browse/FLINK-19132
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.9.1, 1.10.1
> Reporter: Olivier Zembri
> Priority: Major
>
> We deploy Flink jobs packaged as fat jar files compiled with Java 1.8 on a Flink session cluster in Kubernetes.
> After restarting the Kubernetes cluster, the jobs fail to start and we get several NoClassDefFoundError in the Task Manager log.
> *Stack trace*
> {color:#7a869a}{color}
> {code:java}
> java.lang.NoClassDefFoundError: org.apache.kafka.common.security.scram.ScramSaslClient
> {
> "class": "org.apache.kafka.common.security.scram.ScramSaslClient",
> "method": "evaluateChallenge",
> "file": "ScramSaslClient.java",
> "line": 128,
> },
> {
> "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
> "method": "run",
> "file": "SaslClientAuthenticator.java",
> "line": 280,
> },
> {
> "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
> "method": "run",
> "file": "SaslClientAuthenticator.java",
> "line": 278,
> },
> {
> "class": "java.security.AccessController",
> "method": "doPrivileged",
> "file": "AccessController.java",
> "line": -2,
> },
> {
> "class": "javax.security.auth.Subject",
> "method": "doAs",
> "file": "Subject.java",
> "line": 422,
> },
> {
> "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
> "method": "createSaslToken",
> "file": "SaslClientAuthenticator.java",
> "line": 278,
> },
> {
> "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
> "method": "sendSaslToken",
> "file": "SaslClientAuthenticator.java",
> "line": 215,
> },
> {
> "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
> "method": "authenticate",
> "file": "SaslClientAuthenticator.java",
> "line": 189,
> },
> {
> "class": "org.apache.kafka.common.network.KafkaChannel",
> "method": "prepare",
> "file": "KafkaChannel.java",
> "line": 76,
> },
> {
> "class": "org.apache.kafka.common.network.Selector",
> "method": "pollSelectionKeys",
> "file": "Selector.java",
> "line": 376,
> },
> {
> "class": "org.apache.kafka.common.network.Selector",
> "method": "poll",
> "file": "Selector.java",
> "line": 326,
> },
> {
> "class": "org.apache.kafka.clients.NetworkClient",
> "method": "poll",
> "file": "NetworkClient.java",
> "line": 433,
> },
> {
> "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
> "method": "poll",
> "file": "ConsumerNetworkClient.java",
> "line": 232,
> },
> {
> "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
> "method": "poll",
> "file": "ConsumerNetworkClient.java",
> "line": 208,
> },
> {
> "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
> "method": "poll",
> "file": "ConsumerNetworkClient.java",
> "line": 184,
> },
> {
> "class": "org.apache.kafka.clients.consumer.internals.Fetcher",
> "method": "getTopicMetadata",
> "file": "Fetcher.java",
> "line": 314,
> },
> {
> "class": "org.apache.kafka.clients.consumer.KafkaConsumer",
> "method": "partitionsFor",
> "file": "KafkaConsumer.java",
> "line": 1386,
> },
> {
> "class": "org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer",
> "method": "getAllPartitionsForTopics",
> "file": "Kafka09PartitionDiscoverer.java",
> },
> {
> "class": "org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer",
> "method": "discoverPartitions",
> "file": "AbstractPartitionDiscoverer.java",
> "line": 131,
> },
> {
> "class": "org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase",
> "method": "open",
> "file": "FlinkKafkaConsumerBase.java",
> "line": 508,
> },
> {
> "class": "org.apache.flink.api.common.functions.util.FunctionUtils",
> "method": "openFunction",
> "file": "FunctionUtils.java",
> "line": 36,
> },
> {
> "class": "org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator",
> "method": "open",
> "file": "AbstractUdfStreamOperator.java",
> "line": 102,
> },
> {
> "class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
> "method": "openAllOperators",
> "file": "StreamTask.java",
> "line": 532,
> },
> {
> "class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
> "method": "invoke",
> "file": "StreamTask.java",
> "line": 396,
> },
> {
> "class": "org.apache.flink.runtime.taskmanager.Task",
> "method": "doRun",
> "file": "Task.java",
> "line": 705,
> },
> {
> "class": "org.apache.flink.runtime.taskmanager.Task",
> "method": "run",
> "file": "Task.java",
> "line": 530,
> },
> {
> "class": "java.lang.Thread",
> "method": "run",
> "file": "Thread.java",
> "line": 748,
> }{code}
> {color:#7a869a} {color}
> *Workaround:*
> - Copy the jar file containing the missing classes in the /lib folder
> /opt/flink/lib/kafka-clients-0.11.0.jar
> - Update the [flink-conf.yaml|https://github.ibm.com/dba/taiga-flink/blob/master/conf/flink-conf.yaml] with
> {{classloader.parent-first-patterns.additional: org.apache.kafka}}
> {{_Note:_ This issue is very similar to https://issues.apache.org/jira/browse/FLINK-14012.}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)