You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "ODa (JIRA)" <ji...@apache.org> on 2019/07/11 08:33:00 UTC
[jira] [Created] (FLUME-3340) Error with HDFS SINK kerberized
ODa created FLUME-3340:
--------------------------
Summary: Error with HDFS SINK kerberized
Key: FLUME-3340
URL: https://issues.apache.org/jira/browse/FLUME-3340
Project: Flume
Issue Type: Choose from below ...
Components: Configuration
Affects Versions: 1.6.0
Environment: Flume 1.6 (embedded release in CDH 15.16.1)
kafka 2.11-2.1.1cp1-1 (conluent community)
3 securized brokers witch SASL_PLAINTEXT (mechanism PLAIN)
Hadoop 2.6.0 (CDH 15.6.1) kerberized
RHEL 6.9
Reporter: ODa
Hello,
I'm a big data administrator with cloudera distribution.
We use Flume to collect external data and we push it in hdfs (hadoop).
We have a issue with hdfs sink with theses messages :
2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp
2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
The source kafka (confluent community) is securiezed with SASL_PLAINTEXT
The flume configuration is :(
agent.sources = apps
agent.channels = appsChannel
agent.sinks = appsSink
###
### Source definition
###
agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092
agent.sources.apps.kafka.topics = mytopic01
agent.sources.apps.kafka.consumer.client.id=ClientIDapps
agent.sources.apps.kafka.consumer.group.id=GroupIDapps
agent.sources.apps.channels = appsChannel
agent.sources.apps.batchSize=500
agent.sources.apps.interceptors = i1 hostint
agent.sources.apps.interceptors.i1.type = timestamp
agent.sources.apps.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.apps.interceptors.hostint.preserveExisting = true
agent.sources.apps.interceptors.hostint.useIP = false
agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=username password=password ;
###
### Channel definition
###
agent.channels.appsChannel.type = memory
agent.channels.appsChannel.capacity = 500000
agent.channels.appsChannel.transactionCapacity = 1000
###
### Sink definition
agent.sinks.appsSink.type = hdfs
agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
agent.sinks.appsSink.maxOpenFiles = 100
agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
agent.sinks.appsSink.hdfs.fileSuffix=.json
agent.sinks.appsSink.hdfs.rollInterval=60
agent.sinks.appsSink.hdfs.rollSize=0
agent.sinks.appsSink.hdfs.rollCount=100000
agent.sinks.appsSink.hdfs.idleTimeout=60
agent.sinks.appsSink.hdfs.callTimeout=60000
agent.sinks.appsSink.hdfs.batchSize=1000
agent.sinks.appsSink.hdfs.fileType=DataStream
agent.sinks.appsSink.hdfs.writeFormat=Writable
agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
agent.sinks.appsSink.hdfs.serializer=TEXT
agent.sinks.appsSink.channel = appsChannel
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org