You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "谢波 (Jira)" <ji...@apache.org> on 2020/12/10 08:53:00 UTC

[jira] [Comment Edited] (FLINK-20377) flink-1.11.2 -kerberos config on kafka connector not working

    [ https://issues.apache.org/jira/browse/FLINK-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247094#comment-17247094 ] 

谢波 edited comment on FLINK-20377 at 12/10/20, 8:52 AM:
-------------------------------------------------------

flink 1.12 documents  not show the properties.* options.


was (Author: hiscat):
flink 1.12 not show the properties.* options.

> flink-1.11.2 -kerberos config on kafka connector not working
> ------------------------------------------------------------
>
>                 Key: FLINK-20377
>                 URL: https://issues.apache.org/jira/browse/FLINK-20377
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.11.2
>         Environment: flink on yarn
> kafka with kerberos 
> flink-1.11.2_2.11
>            Reporter: 谢波
>            Priority: Major
>             Fix For: 1.12.0
>
>
> I refer to the configuration on the official website to configure Kafka and flink-conf.yaml ,but the configuration does not work.
> my table config :
> WITH (
>  'connector' = 'kafka',
>  'properties.bootstrap.servers' = 'xxxxxxxx',
>  'topic' = 'kafka_hepecc_ekko_cut_json',
>  'properties.group.id' = 'ekko.group',
>  'properties.security.protocol' = 'SASL_PLAINTEXT',
>  'properties.sasl.kerberos.service.name' = 'kafka',
>  – 'properties.sasl.mechanism' = 'GSSAPI',
>  'format' = 'json'
>  );
> yaml:
> security.kerberos.login.use-ticket-cache: false
>  security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab
>  security.kerberos.login.principal: xiebo@xxx.CN
>  # The configuration below defines which JAAS login contexts
> security.kerberos.login.contexts: Client,KafkaClient
>  
> dir content:
> [xiebo@ww021 keytab]$ pwd
>  /home/xiebo/module/flink/keytab
>  [xiebo@ww021 keytab]$ ll 
>  total 12
>  -rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf
>  -rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf
>  -rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab
>  
> I get an error:
>  
>  2020-11-26 19:01:55
>  org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
>      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
>      at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
>      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
>      at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
>      at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111)
>      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
>      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
>      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>      at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
>      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>      at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>      at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>      at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>      at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>      at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user
>     at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>      at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
>      at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
>      at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
>      at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
>      at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
>      ... 23 more
>  Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user
>     at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:901)
>      at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:764)
>      at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>      at java.lang.reflect.Method.invoke(Method.java:498)
>      at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
>      at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
>      at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
>      at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
>      at java.security.AccessController.doPrivileged(Native Method)
>      at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
>      at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
>      at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
>      at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103)
>      at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
>      at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
>      at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
>   
> reference :
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems]
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#enabling-kerberos-authentication]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)