You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flume.apache.org by "ODa (JIRA)" <ji...@apache.org> on 2019/07/11 08:33:00 UTC

[jira] [Created] (FLUME-3340) Error with HDFS SINK kerberized

ODa created FLUME-3340:
--------------------------

             Summary: Error with HDFS SINK kerberized
                 Key: FLUME-3340
                 URL: https://issues.apache.org/jira/browse/FLUME-3340
             Project: Flume
          Issue Type: Choose from below ...
          Components: Configuration
    Affects Versions: 1.6.0
         Environment: Flume 1.6 (embedded release in CDH 15.16.1)

kafka 2.11-2.1.1cp1-1 (conluent community)
   3 securized brokers witch SASL_PLAINTEXT (mechanism PLAIN)

Hadoop 2.6.0 (CDH 15.6.1) kerberized
RHEL 6.9

            Reporter: ODa


Hello,

I'm a big data administrator with cloudera distribution.

We use Flume to collect external data and we push it in hdfs (hadoop).

We have a issue with hdfs sink with theses messages :

 

2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp

2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
java.lang.NullPointerException
 at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
 at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
 at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
 at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
 at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
 at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
 at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
 at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException
 at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
 at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
 at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
 at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
 at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
 at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
 at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
 at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
 at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
 at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
 at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
 at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
 at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
 at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
 at org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
 at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more

 

 

The source kafka (confluent community) is securiezed with SASL_PLAINTEXT

 

The flume configuration is :(

 

agent.sources = apps
agent.channels = appsChannel
agent.sinks = appsSink

###
### Source definition
###
agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092

agent.sources.apps.kafka.topics = mytopic01

agent.sources.apps.kafka.consumer.client.id=ClientIDapps
agent.sources.apps.kafka.consumer.group.id=GroupIDapps

agent.sources.apps.channels = appsChannel
agent.sources.apps.batchSize=500

agent.sources.apps.interceptors = i1 hostint
agent.sources.apps.interceptors.i1.type = timestamp
agent.sources.apps.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.apps.interceptors.hostint.preserveExisting = true
agent.sources.apps.interceptors.hostint.useIP = false

agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=username password=password ;
###
### Channel definition
###

agent.channels.appsChannel.type = memory
agent.channels.appsChannel.capacity = 500000
agent.channels.appsChannel.transactionCapacity = 1000

 

###

### Sink definition

agent.sinks.appsSink.type = hdfs
agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
agent.sinks.appsSink.maxOpenFiles = 100
agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
agent.sinks.appsSink.hdfs.fileSuffix=.json
agent.sinks.appsSink.hdfs.rollInterval=60
agent.sinks.appsSink.hdfs.rollSize=0
agent.sinks.appsSink.hdfs.rollCount=100000
agent.sinks.appsSink.hdfs.idleTimeout=60
agent.sinks.appsSink.hdfs.callTimeout=60000
agent.sinks.appsSink.hdfs.batchSize=1000
agent.sinks.appsSink.hdfs.fileType=DataStream
agent.sinks.appsSink.hdfs.writeFormat=Writable
agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
agent.sinks.appsSink.hdfs.serializer=TEXT
agent.sinks.appsSink.channel = appsChannel



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@flume.apache.org
For additional commands, e-mail: issues-help@flume.apache.org