You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Suo L. (Jira)" <ji...@apache.org> on 2021/01/19 08:51:00 UTC

[jira] [Commented] (FLINK-19132) Failed to start jobs for consuming Secure Kafka after cluster restart

    [ https://issues.apache.org/jira/browse/FLINK-19132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267757#comment-17267757 ] 

Suo L. commented on FLINK-19132:
--------------------------------

See the same issue, 1.10.2 + k8s, temporary solved by recreate a cluster.
{noformat}
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/security/scram/internals/ScramSaslClient
	at org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
	at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
	at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
	at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
	at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
	at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
	at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
	at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
	at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
	at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
	at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:511)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1019)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:463)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:458)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:778)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
	at java.lang.Thread.run(Thread.java:748)
{noformat}

> Failed to start jobs for consuming Secure Kafka after cluster restart
> ---------------------------------------------------------------------
>
>                 Key: FLINK-19132
>                 URL: https://issues.apache.org/jira/browse/FLINK-19132
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.1, 1.10.1
>            Reporter: Olivier Zembri
>            Priority: Major
>
> We deploy Flink jobs packaged as fat jar files compiled with Java 1.8 on a Flink session cluster in Kubernetes.
> After restarting the Kubernetes cluster, the jobs fail to start and we get several NoClassDefFoundError in the Task Manager log.
> *Stack trace*
> {color:#7a869a}{color}
> {code:java}
> java.lang.NoClassDefFoundError: org.apache.kafka.common.security.scram.ScramSaslClient
> {
>  "class": "org.apache.kafka.common.security.scram.ScramSaslClient",
>  "method": "evaluateChallenge",
>  "file": "ScramSaslClient.java",
>  "line": 128,
>  },
>  {
>  "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
>  "method": "run",
>  "file": "SaslClientAuthenticator.java",
>  "line": 280,
>  },
>  {
>  "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
>  "method": "run",
>  "file": "SaslClientAuthenticator.java",
>  "line": 278,
>  },
>  {
>  "class": "java.security.AccessController",
>  "method": "doPrivileged",
>  "file": "AccessController.java",
>  "line": -2,
>  },
>  {
>  "class": "javax.security.auth.Subject",
>  "method": "doAs",
>  "file": "Subject.java",
>  "line": 422,
>  },
>  {
>  "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
>  "method": "createSaslToken",
>  "file": "SaslClientAuthenticator.java",
>  "line": 278,
>  },
>  {
>  "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
>  "method": "sendSaslToken",
>  "file": "SaslClientAuthenticator.java",
>  "line": 215,
>  },
>  {
>  "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
>  "method": "authenticate",
>  "file": "SaslClientAuthenticator.java",
>  "line": 189,
>  },
>  {
>  "class": "org.apache.kafka.common.network.KafkaChannel",
>  "method": "prepare",
>  "file": "KafkaChannel.java",
>  "line": 76,
>  },
>  {
>  "class": "org.apache.kafka.common.network.Selector",
>  "method": "pollSelectionKeys",
>  "file": "Selector.java",
>  "line": 376,
>  },
>  {
>  "class": "org.apache.kafka.common.network.Selector",
>  "method": "poll",
>  "file": "Selector.java",
>  "line": 326,
>  },
>  {
>  "class": "org.apache.kafka.clients.NetworkClient",
>  "method": "poll",
>  "file": "NetworkClient.java",
>  "line": 433,
>  },
>  {
>  "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
>  "method": "poll",
>  "file": "ConsumerNetworkClient.java",
>  "line": 232,
>  },
>  {
>  "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
>  "method": "poll",
>  "file": "ConsumerNetworkClient.java",
>  "line": 208,
>  },
>  {
>  "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
>  "method": "poll",
>  "file": "ConsumerNetworkClient.java",
>  "line": 184,
>  },
>  {
>  "class": "org.apache.kafka.clients.consumer.internals.Fetcher",
>  "method": "getTopicMetadata",
>  "file": "Fetcher.java",
>  "line": 314,
>  },
>  {
>  "class": "org.apache.kafka.clients.consumer.KafkaConsumer",
>  "method": "partitionsFor",
>  "file": "KafkaConsumer.java",
>  "line": 1386,
>  },
>  {
>  "class": "org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer",
>  "method": "getAllPartitionsForTopics",
>  "file": "Kafka09PartitionDiscoverer.java",
>  },
>  {
>  "class": "org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer",
>  "method": "discoverPartitions",
>  "file": "AbstractPartitionDiscoverer.java",
>  "line": 131,
>  },
>  {
>  "class": "org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase",
>  "method": "open",
>  "file": "FlinkKafkaConsumerBase.java",
>  "line": 508,
>  },
>  {
>  "class": "org.apache.flink.api.common.functions.util.FunctionUtils",
>  "method": "openFunction",
>  "file": "FunctionUtils.java",
>  "line": 36,
>  },
>  {
>  "class": "org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator",
>  "method": "open",
>  "file": "AbstractUdfStreamOperator.java",
>  "line": 102,
>  },
>  {
>  "class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>  "method": "openAllOperators",
>  "file": "StreamTask.java",
>  "line": 532,
>  },
>  {
>  "class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>  "method": "invoke",
>  "file": "StreamTask.java",
>  "line": 396,
>  },
>  {
>  "class": "org.apache.flink.runtime.taskmanager.Task",
>  "method": "doRun",
>  "file": "Task.java",
>  "line": 705,
>  },
>  {
>  "class": "org.apache.flink.runtime.taskmanager.Task",
>  "method": "run",
>  "file": "Task.java",
>  "line": 530,
>  },
>  {
>  "class": "java.lang.Thread",
>  "method": "run",
>  "file": "Thread.java",
>  "line": 748,
>  }{code}
> {color:#7a869a} {color}
> *Workaround:*
> - Copy the jar file containing the missing classes in the /lib folder
>       /opt/flink/lib/kafka-clients-0.11.0.jar
> - Update the [flink-conf.yaml|https://github.ibm.com/dba/taiga-flink/blob/master/conf/flink-conf.yaml] with
> {{classloader.parent-first-patterns.additional: org.apache.kafka}}
> {{_Note:_ This issue is very similar to https://issues.apache.org/jira/browse/FLINK-14012.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)