You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "谢波 (Jira)" <ji...@apache.org> on 2020/12/10 08:59:00 UTC
[jira] [Issue Comment Deleted] (FLINK-20377) flink-1.11.2 -kerberos
config on kafka connector not working
[ https://issues.apache.org/jira/browse/FLINK-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
谢波 updated FLINK-20377:
-----------------------
Comment: was deleted
(was: flink 1.12 documents not show the properties.* options.)
> flink-1.11.2 -kerberos config on kafka connector not working
> ------------------------------------------------------------
>
> Key: FLINK-20377
> URL: https://issues.apache.org/jira/browse/FLINK-20377
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / Ecosystem
> Affects Versions: 1.11.2
> Environment: flink on yarn
> kafka with kerberos
> flink-1.11.2_2.11
> Reporter: 谢波
> Priority: Major
> Fix For: 1.12.0
>
>
> I refer to the configuration on the official website to configure Kafka and flink-conf.yaml ,but the configuration does not work.
> my table config :
> WITH (
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'xxxxxxxx',
> 'topic' = 'kafka_hepecc_ekko_cut_json',
> 'properties.group.id' = 'ekko.group',
> 'properties.security.protocol' = 'SASL_PLAINTEXT',
> 'properties.sasl.kerberos.service.name' = 'kafka',
> – 'properties.sasl.mechanism' = 'GSSAPI',
> 'format' = 'json'
> );
> yaml:
> security.kerberos.login.use-ticket-cache: false
> security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab
> security.kerberos.login.principal: xiebo@xxx.CN
> # The configuration below defines which JAAS login contexts
> security.kerberos.login.contexts: Client,KafkaClient
>
> dir content:
> [xiebo@ww021 keytab]$ pwd
> /home/xiebo/module/flink/keytab
> [xiebo@ww021 keytab]$ ll
> total 12
> -rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf
> -rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf
> -rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab
>
> I get an error:
>
> 2020-11-26 19:01:55
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
> at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
> at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
> at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user
> at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
> at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
> at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
> at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
> ... 23 more
> Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user
> at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:901)
> at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:764)
> at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
> at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
> at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
> at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
> at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103)
> at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
> at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
> at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
>
> reference :
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#enabling-kerberos-authentication]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)