You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Rahul Ravindran <ra...@yahoo.com> on 2013/01/14 22:42:05 UTC

HDFSsink failover error

Hi,
   I am attempting to setup an HDFS sink such that when a namenode failover occurs (active namenode is brought down and the standby namenode switches to active), the failover sink would send events to the new active namenode. I see an error about WRITE not supported in standby state..Does this not count as a failure for the failover sink?
Thanks,
~Rahul.

My config is as follows:

agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000

agent1.sinks.hdfs-sink1-back.channel = ch1
agent1.sinks.hdfs-sink1-back.type = hdfs
agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000

agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
agent1.sinkgroups.failoverGroup1.processor.type = failover
#higher number in priority is higher priority
agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
#failover if failure detected for 10 seconds
agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000


agent1.sinkgroups = failoverGroup1


14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6] (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating hdfs://ip-10-4-71-187.ec2.internal/....
14 Jan 2013 21:37:28,834 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby
        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)

        at org.apache.hadoop.ipc.Client.call(Client.java:1160)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
        at $Proxy11.create(Unknown Source)
        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:616)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho

Re: HDFSsink failover error

Posted by Brock Noland <br...@cloudera.com>.
This is https://issues.apache.org/jira/browse/FLUME-1779

On Mon, Jan 14, 2013 at 4:25 PM, Connor Woodson <cw...@gmail.com> wrote:
> Oh alright, found it. What is happening is that the HDFS sink does not throw
> an exception for this write error, but instead returns a Status.BACKOFF, and
> as such the failover processor doesn't think this sink failed.
>
> (What is strange is that the processor deals with the backoff message for
> failed sinks, but not active sinks).
>
> So until that's fixed there isn't a clean way to fix this. The best solution
> I can offer is to get the source code for Flume (either the latest one or
> the 1.3.1 tag), and make the following change:
>
> In the process method of the HDFSEventSink, in the catch-statements, change:
>
> LOG.warn("HDFS IO error", eIO);
> return Status.BACKOFF;
>
>
> to:
>
> LOG.warn("HDFS IO error", eIO);
> throw eIO;
>
>
> (Line 457 in 1.3.1 or line 454 in trunk)
>
> What this will end up doing is if you ever use the sink outside of a
> failover processor, when there is a write error then the sink will throw an
> exception and it will probably stop - so, this change will only make it able
> to work within the failover sink processor. Optionally, you could make a
> copy of the HDFSEventSink (call it FailoverHDFSEventSink if you want) and
> put that change in it, so that you can have both versions of the sink.
>
> (if you want instructions on compiling Flume after this change, look for the
> thread 'custome serializer')
>
> Unfortunate issue, but it will be fixed in 1.4 I'm sure.
>
> - Connor
>
> On Mon, Jan 14, 2013 at 4:00 PM, Rahul Ravindran <ra...@yahoo.com> wrote:
>>
>> Here is the entire log file after I restart flume
>>
>> ________________________________
>> From: Connor Woodson <cw...@gmail.com>
>> To: "user@flume.apache.org" <us...@flume.apache.org>; Rahul Ravindran
>> <ra...@yahoo.com>
>> Sent: Monday, January 14, 2013 3:51 PM
>>
>> Subject: Re: HDFSsink failover error
>>
>> Can you look at the full log file and post the above section as well as
>> 5-10 lines above/below it (you don't have to post that stack trace if you
>> don't want)? Because that error, while it should definitely be logged,
>> should be followed by some error lines giving context as to what is going
>> on. And if that is the end of the log file then...well, that just shouldn't
>> happen, as there are several different places that would have produced log
>> messages as that exception propagates
>>
>> - Connor
>>
>> On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <ra...@yahoo.com>
>> wrote:
>>
>> The writes to the backup were successful when I attempted to write to it
>> directly but not via the failover sink processor. I did not see the warning
>> that you mentioned about "Sink hdfs-sink1failed".
>>
>> The full log trace is below:
>>
>> 14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1]
>> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
>> 14 Jan 2013 22:48:24,739 WARN
>> [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>>
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby
>>         at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:396)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>         at $Proxy11.create(Unknown Source)
>>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:616)
>>         at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>>         at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>>         at $Proxy11.create(Unknown Source)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>>        at
>> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>>         at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:679)
>>
>> ________________________________
>> From: Connor Woodson <cw...@gmail.com>
>> To: Rahul Ravindran <ra...@yahoo.com>
>> Cc: "user@flume.apache.org" <us...@flume.apache.org>
>> Sent: Monday, January 14, 2013 3:05 PM
>>
>> Subject: Re: HDFSsink failover error
>>
>> So you are able to write normally to the back-up HDFS servers? And that
>> error you got was when you were trying to write to the normal server? Was it
>> supposed to be an error (it looks like it's due to how your Hadoop is set
>> up)?
>>
>> The log lines you pasted make it look like there was a problem with your
>> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
>> what should have happened is that the event was then written to the backup
>> server. Below the stack trace there should probably have been another WARN
>> statement saying "Sink hdfs-sink1 failed and has been sent to the failover
>> list". And if hdfs-sink1-back then was unable to write, you would see a
>> thrown EventDeliveryException in your log.
>>
>> If there isn't anything else in the log, and the event wasn't written to
>> the backup server, then that would be a bug.
>>
>> - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com>
>> wrote:
>>
>> Here is the full config. I swapped the priorities on the sink processor
>> after performing the namenode failiver and the writes were successful to the
>> newly active namenode.
>>
>> agent1.channels.ch1.type = FILE
>> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
>> agent1.channels.ch1.dataDirs = /flume_runtime/data
>>
>>
>> agent1.channels.ch2.type = FILE
>> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
>> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>>
>>
>>
>> # Define an Avro source called avro-source1 on agent1 and tell it
>>
>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>
>> agent1.sources.avro-source1.channels = ch1
>> agent1.sources.avro-source1.type = avro
>> agent1.sources.avro-source1.bind = 0.0.0.0
>> agent1.sources.avro-source1.port = 4545
>>
>>
>>
>> agent1.sources.avro-source2.channels = ch2
>> agent1.sources.avro-source2.type = avro
>> agent1.sources.avro-source2.bind = 0.0.0.0
>> agent1.sources.avro-source2.port = 4546
>>
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>> agent1.sinks.hdfs-sink1.type = hdfs
>> agent1.sinks.hdfs-sink1.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink2.channel = ch2
>> agent1.sinks.hdfs-sink2.type = hdfs
>> agent1.sinks.hdfs-sink2.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>>
>>
>> agent1.sinks.hdfs-sink1-back.channel = ch1
>> agent1.sinks.hdfs-sink1-back.type = hdfs
>> agent1.sinks.hdfs-sink1-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink2-back.channel = ch2
>> agent1.sinks.hdfs-sink2-back.type = hdfs
>> agent1.sinks.hdfs-sink2-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
>> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>>
>>
>>
>> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>> agent1.sinkgroups.failoverGroup1.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
>> agent1.sinkgroups.failoverGroup2.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
>> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>>
>> # Finally, now that we've defined all of our components, tell
>> # agent1 which ones we want to activate.
>> agent1.sinkgroups = failoverGroup1 failoverGroup2
>> agent1.channels = ch1 ch2
>> agent1.sources = avro-source1 avro-source2
>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>>
>> ________________________________
>> From: Connor Woodson <cw...@gmail.com>
>> To: user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com>
>> Sent: Monday, January 14, 2013 2:28 PM
>> Subject: Re: HDFSsink failover error
>>
>> I assume that's only part of your config as it's missing a source; if you
>> get rid of the sink processor, can you write to each hdfs sink individually?
>> (comment one out at a time)
>>
>> - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com>
>> wrote:
>>
>> Hi,
>>    I am attempting to setup an HDFS sink such that when a namenode
>> failover occurs (active namenode is brought down and the standby namenode
>> switches to active), the failover sink would send events to the new active
>> namenode. I see an error about WRITE not supported in standby state..Does
>> this not count as a failure for the failover sink?
>> Thanks,
>> ~Rahul.
>>
>> My config is as follows:
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>> agent1.sinks.hdfs-sink1.type = hdfs
>> agent1.sinks.hdfs-sink1.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink1-back.channel = ch1
>> agent1.sinks.hdfs-sink1-back.type = hdfs
>> agent1.sinks.hdfs-sink1-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>> agent1.sinkgroups.failoverGroup1.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>> agent1.sinkgroups = failoverGroup1
>>
>> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
>> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
>> hdfs://ip-10-4-71-187.ec2.internal/....
>> 14 Jan 2013 21:37:28,834 WARN
>> [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>>
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby
>>         at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:396)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>         at $Proxy11.create(Unknown Source)
>>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:616)
>>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Re: HDFSsink failover error

Posted by Brock Noland <br...@cloudera.com>.
Hi,

Yeah I think this is:

https://issues.apache.org/jira/browse/FLUME-1779

Brock


On Mon, Jan 14, 2013 at 4:28 PM, Connor Woodson <cw...@gmail.com> wrote:
> Forwarding from the user list. The bug here is that the HDFSEventSink will
> not work in a FailoverSinkProcessor. What is going on is that when there is
> an IOException, the HDFSEventSink will return Status.BACKOFF; however, in
> the failover processor, a sink is only failed if it throws an exception. So
> maybe the next process call to the HDFSEventSink will write correctly;
> however, if it can't, it will never roll over. The solution I proposed
> (throwing an exception) isn't exactly the most elegant, but I can't think
> of a better way to go about it.
>
> - Connor
>
> ---------- Forwarded message ----------
> From: Connor Woodson <cw...@gmail.com>
> Date: Mon, Jan 14, 2013 at 4:25 PM
> Subject: Re: HDFSsink failover error
> To: "user@flume.apache.org" <us...@flume.apache.org>, Rahul Ravindran <
> rahulrv@yahoo.com>
>
>
> Oh alright, found it. What is happening is that the HDFS sink does not
> throw an exception for this write error, but instead returns a
> Status.BACKOFF, and as such the failover processor doesn't think this sink
> failed.
>
> (What is strange is that the processor deals with the backoff message for
> failed sinks, but not active sinks).
>
> So until that's fixed there isn't a clean way to fix this. The best
> solution I can offer is to get the source code for Flume (either the latest
> one or the 1.3.1 tag), and make the following change:
>
> In the process method of the HDFSEventSink, in the catch-statements, change:
>
> LOG.warn("HDFS IO error", eIO);
> return Status.BACKOFF;
>
>
> to:
>
> LOG.warn("HDFS IO error", eIO);
> throw eIO;
>
>
> (Line 457 in 1.3.1 or line 454 in trunk)
>
> What this will end up doing is if you ever use the sink outside of a
> failover processor, when there is a write error then the sink will throw an
> exception and it will probably stop - so, this change will only make it
> able to work within the failover sink processor. Optionally, you could make
> a copy of the HDFSEventSink (call it FailoverHDFSEventSink if you want) and
> put that change in it, so that you can have both versions of the sink.
>
> (if you want instructions on compiling Flume after this change, look for
> the thread 'custome serializer')
>
> Unfortunate issue, but it will be fixed in 1.4 I'm sure.
>
> - Connor
>
> On Mon, Jan 14, 2013 at 4:00 PM, Rahul Ravindran <ra...@yahoo.com> wrote:
>
>> Here is the entire log file after I restart flume
>>
>>   ------------------------------
>> *From:* Connor Woodson <cw...@gmail.com>
>> *To:* "user@flume.apache.org" <us...@flume.apache.org>; Rahul Ravindran <
>> rahulrv@yahoo.com>
>> *Sent:* Monday, January 14, 2013 3:51 PM
>>
>> *Subject:* Re: HDFSsink failover error
>>
>> Can you look at the full log file and post the above section as well as
>> 5-10 lines above/below it (you don't have to post that stack trace if you
>> don't want)? Because that error, while it should definitely be logged,
>> should be followed by some error lines giving context as to what is going
>> on. And if that is the end of the log file then...well, that just shouldn't
>> happen, as there are several different places that would have produced log
>> messages as that exception propagates
>>
>> - Connor
>>
>> On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>>
>>  The writes to the backup were successful when I attempted to write to it
>> directly but not via the failover sink processor. I did not see the warning
>> that you mentioned about "Sink hdfs-sink1failed".
>>
>> The full log trace is below:
>>
>> 14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1]
>> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
>> 14 Jan 2013 22:48:24,739 WARN
>>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby
>>         at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:396)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>         at $Proxy11.create(Unknown Source)
>>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>          at java.lang.reflect.Method.invoke(Method.java:616)
>>         at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>>         at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>>         at $Proxy11.create(Unknown Source)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>>        at
>> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>>         at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:679)
>>
>>   ------------------------------
>> *From:* Connor Woodson <cw...@gmail.com>
>> *To:* Rahul Ravindran <ra...@yahoo.com>
>> *Cc:* "user@flume.apache.org" <us...@flume.apache.org>
>> *Sent:* Monday, January 14, 2013 3:05 PM
>>
>> *Subject:* Re: HDFSsink failover error
>>
>> So you are able to write normally to the back-up HDFS servers? And that
>> error you got was when you were trying to write to the normal server? Was
>> it supposed to be an error (it looks like it's due to how your Hadoop is
>> set up)?
>>
>> The log lines you pasted make it look like there was a problem with your
>> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
>> what should have happened is that the event was then written to the backup
>> server. Below the stack trace there should probably have been another WARN
>> statement saying "Sink hdfs-sink1 failed and has been sent to the failover
>> list". And if hdfs-sink1-back then was unable to write, you would see a
>> thrown EventDeliveryException in your log.
>>
>> If there isn't anything else in the log, and the event wasn't written to
>> the backup server, then that would be a bug.
>>
>> - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>>
>>  Here is the full config. I swapped the priorities on the sink processor
>> after performing the namenode failiver and the writes were successful to
>> the newly active namenode.
>>
>> agent1.channels.ch1.type = FILE
>> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
>> agent1.channels.ch1.dataDirs = /flume_runtime/data
>>
>>
>> agent1.channels.ch2.type = FILE
>> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
>> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>>
>>
>>
>> # Define an Avro source called avro-source1 on agent1 and tell it
>>
>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>
>> agent1.sources.avro-source1.channels = ch1
>> agent1.sources.avro-source1.type = avro
>> agent1.sources.avro-source1.bind = 0.0.0.0
>> agent1.sources.avro-source1.port = 4545
>>
>>
>>
>> agent1.sources.avro-source2.channels = ch2
>> agent1.sources.avro-source2.type = avro
>> agent1.sources.avro-source2.bind = 0.0.0.0
>> agent1.sources.avro-source2.port = 4546
>>
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>> agent1.sinks.hdfs-sink1.type = hdfs
>> agent1.sinks.hdfs-sink1.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink2.channel = ch2
>> agent1.sinks.hdfs-sink2.type = hdfs
>> agent1.sinks.hdfs-sink2.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>>
>>
>> agent1.sinks.hdfs-sink1-back.channel = ch1
>> agent1.sinks.hdfs-sink1-back.type = hdfs
>> agent1.sinks.hdfs-sink1-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink2-back.channel = ch2
>> agent1.sinks.hdfs-sink2-back.type = hdfs
>> agent1.sinks.hdfs-sink2-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
>> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>>
>>
>>
>> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>> agent1.sinkgroups.failoverGroup1.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
>> agent1.sinkgroups.failoverGroup2.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
>> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>>
>> # Finally, now that we've defined all of our components, tell
>> # agent1 which ones we want to activate.
>> agent1.sinkgroups = failoverGroup1 failoverGroup2
>> agent1.channels = ch1 ch2
>> agent1.sources = avro-source1 avro-source2
>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>>
>>    ------------------------------
>> *From:* Connor Woodson <cw...@gmail.com>
>> *To:* user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com>
>> *Sent:* Monday, January 14, 2013 2:28 PM
>> *Subject:* Re: HDFSsink failover error
>>
>> I assume that's only part of your config as it's missing a source; if you
>> get rid of the sink processor, can you write to each hdfs sink
>> individually? (comment one out at a time)
>>
>>  - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>>
>> Hi,
>>    I am attempting to setup an HDFS sink such that when a namenode
>> failover occurs (active namenode is brought down and the standby namenode
>> switches to active), the failover sink would send events to the new active
>> namenode. I see an error about WRITE not supported in standby state..Does
>> this not count as a failure for the failover sink?
>> Thanks,
>> ~Rahul.
>>
>> My config is as follows:
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>> agent1.sinks.hdfs-sink1.type = hdfs
>> agent1.sinks.hdfs-sink1.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink1-back.channel = ch1
>> agent1.sinks.hdfs-sink1-back.type = hdfs
>> agent1.sinks.hdfs-sink1-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>> agent1.sinkgroups.failoverGroup1.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>> agent1.sinkgroups = failoverGroup1
>>
>> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
>> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
>> hdfs://ip-10-4-71-187.ec2.internal/....
>> 14 Jan 2013 21:37:28,834 WARN
>>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby
>>         at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:396)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>         at $Proxy11.create(Unknown Source)
>>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:616)
>>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Fwd: HDFSsink failover error

Posted by Connor Woodson <cw...@gmail.com>.
Forwarding from the user list. The bug here is that the HDFSEventSink will
not work in a FailoverSinkProcessor. What is going on is that when there is
an IOException, the HDFSEventSink will return Status.BACKOFF; however, in
the failover processor, a sink is only failed if it throws an exception. So
maybe the next process call to the HDFSEventSink will write correctly;
however, if it can't, it will never roll over. The solution I proposed
(throwing an exception) isn't exactly the most elegant, but I can't think
of a better way to go about it.

- Connor

---------- Forwarded message ----------
From: Connor Woodson <cw...@gmail.com>
Date: Mon, Jan 14, 2013 at 4:25 PM
Subject: Re: HDFSsink failover error
To: "user@flume.apache.org" <us...@flume.apache.org>, Rahul Ravindran <
rahulrv@yahoo.com>


Oh alright, found it. What is happening is that the HDFS sink does not
throw an exception for this write error, but instead returns a
Status.BACKOFF, and as such the failover processor doesn't think this sink
failed.

(What is strange is that the processor deals with the backoff message for
failed sinks, but not active sinks).

So until that's fixed there isn't a clean way to fix this. The best
solution I can offer is to get the source code for Flume (either the latest
one or the 1.3.1 tag), and make the following change:

In the process method of the HDFSEventSink, in the catch-statements, change:

LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;


to:

LOG.warn("HDFS IO error", eIO);
throw eIO;


(Line 457 in 1.3.1 or line 454 in trunk)

What this will end up doing is if you ever use the sink outside of a
failover processor, when there is a write error then the sink will throw an
exception and it will probably stop - so, this change will only make it
able to work within the failover sink processor. Optionally, you could make
a copy of the HDFSEventSink (call it FailoverHDFSEventSink if you want) and
put that change in it, so that you can have both versions of the sink.

(if you want instructions on compiling Flume after this change, look for
the thread 'custome serializer')

Unfortunate issue, but it will be fixed in 1.4 I'm sure.

- Connor

On Mon, Jan 14, 2013 at 4:00 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

> Here is the entire log file after I restart flume
>
>   ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* "user@flume.apache.org" <us...@flume.apache.org>; Rahul Ravindran <
> rahulrv@yahoo.com>
> *Sent:* Monday, January 14, 2013 3:51 PM
>
> *Subject:* Re: HDFSsink failover error
>
> Can you look at the full log file and post the above section as well as
> 5-10 lines above/below it (you don't have to post that stack trace if you
> don't want)? Because that error, while it should definitely be logged,
> should be followed by some error lines giving context as to what is going
> on. And if that is the end of the log file then...well, that just shouldn't
> happen, as there are several different places that would have produced log
> messages as that exception propagates
>
> - Connor
>
> On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
>  The writes to the backup were successful when I attempted to write to it
> directly but not via the failover sink processor. I did not see the warning
> that you mentioned about "Sink hdfs-sink1failed".
>
> The full log trace is below:
>
> 14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
> 14 Jan 2013 22:48:24,739 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:616)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>         at $Proxy11.create(Unknown Source)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>        at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:679)
>
>   ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* Rahul Ravindran <ra...@yahoo.com>
> *Cc:* "user@flume.apache.org" <us...@flume.apache.org>
> *Sent:* Monday, January 14, 2013 3:05 PM
>
> *Subject:* Re: HDFSsink failover error
>
> So you are able to write normally to the back-up HDFS servers? And that
> error you got was when you were trying to write to the normal server? Was
> it supposed to be an error (it looks like it's due to how your Hadoop is
> set up)?
>
> The log lines you pasted make it look like there was a problem with your
> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
> what should have happened is that the event was then written to the backup
> server. Below the stack trace there should probably have been another WARN
> statement saying "Sink hdfs-sink1 failed and has been sent to the failover
> list". And if hdfs-sink1-back then was unable to write, you would see a
> thrown EventDeliveryException in your log.
>
> If there isn't anything else in the log, and the event wasn't written to
> the backup server, then that would be a bug.
>
> - Connor
>
>
> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
>  Here is the full config. I swapped the priorities on the sink processor
> after performing the namenode failiver and the writes were successful to
> the newly active namenode.
>
> agent1.channels.ch1.type = FILE
> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
> agent1.channels.ch1.dataDirs = /flume_runtime/data
>
>
> agent1.channels.ch2.type = FILE
> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>
>
>
> # Define an Avro source called avro-source1 on agent1 and tell it
>
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
> agent1.sources.avro-source1.channels = ch1
> agent1.sources.avro-source1.type = avro
> agent1.sources.avro-source1.bind = 0.0.0.0
> agent1.sources.avro-source1.port = 4545
>
>
>
> agent1.sources.avro-source2.channels = ch2
> agent1.sources.avro-source2.type = avro
> agent1.sources.avro-source2.bind = 0.0.0.0
> agent1.sources.avro-source2.port = 4546
>
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2.channel = ch2
> agent1.sinks.hdfs-sink2.type = hdfs
> agent1.sinks.hdfs-sink2.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2-back.channel = ch2
> agent1.sinks.hdfs-sink2-back.type = hdfs
> agent1.sinks.hdfs-sink2-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>
>
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
> agent1.sinkgroups.failoverGroup2.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>
> # Finally, now that we've defined all of our components, tell
> # agent1 which ones we want to activate.
> agent1.sinkgroups = failoverGroup1 failoverGroup2
> agent1.channels = ch1 ch2
> agent1.sources = avro-source1 avro-source2
> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>
>    ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com>
> *Sent:* Monday, January 14, 2013 2:28 PM
> *Subject:* Re: HDFSsink failover error
>
> I assume that's only part of your config as it's missing a source; if you
> get rid of the sink processor, can you write to each hdfs sink
> individually? (comment one out at a time)
>
>  - Connor
>
>
> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
> Hi,
>    I am attempting to setup an HDFS sink such that when a namenode
> failover occurs (active namenode is brought down and the standby namenode
> switches to active), the failover sink would send events to the new active
> namenode. I see an error about WRITE not supported in standby state..Does
> this not count as a failure for the failover sink?
> Thanks,
> ~Rahul.
>
> My config is as follows:
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups = failoverGroup1
>
> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/....
> 14 Jan 2013 21:37:28,834 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:616)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>
>
>
>
>
>
>
>
>
>
>

Re: HDFSsink failover error

Posted by Connor Woodson <cw...@gmail.com>.
Oh alright, found it. What is happening is that the HDFS sink does not
throw an exception for this write error, but instead returns a
Status.BACKOFF, and as such the failover processor doesn't think this sink
failed.

(What is strange is that the processor deals with the backoff message for
failed sinks, but not active sinks).

So until that's fixed there isn't a clean way to fix this. The best
solution I can offer is to get the source code for Flume (either the latest
one or the 1.3.1 tag), and make the following change:

In the process method of the HDFSEventSink, in the catch-statements, change:

LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;


to:

LOG.warn("HDFS IO error", eIO);
throw eIO;


(Line 457 in 1.3.1 or line 454 in trunk)

What this will end up doing is if you ever use the sink outside of a
failover processor, when there is a write error then the sink will throw an
exception and it will probably stop - so, this change will only make it
able to work within the failover sink processor. Optionally, you could make
a copy of the HDFSEventSink (call it FailoverHDFSEventSink if you want) and
put that change in it, so that you can have both versions of the sink.

(if you want instructions on compiling Flume after this change, look for
the thread 'custome serializer')

Unfortunate issue, but it will be fixed in 1.4 I'm sure.

- Connor

On Mon, Jan 14, 2013 at 4:00 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

> Here is the entire log file after I restart flume
>
>   ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* "user@flume.apache.org" <us...@flume.apache.org>; Rahul Ravindran <
> rahulrv@yahoo.com>
> *Sent:* Monday, January 14, 2013 3:51 PM
>
> *Subject:* Re: HDFSsink failover error
>
> Can you look at the full log file and post the above section as well as
> 5-10 lines above/below it (you don't have to post that stack trace if you
> don't want)? Because that error, while it should definitely be logged,
> should be followed by some error lines giving context as to what is going
> on. And if that is the end of the log file then...well, that just shouldn't
> happen, as there are several different places that would have produced log
> messages as that exception propagates
>
> - Connor
>
> On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
>  The writes to the backup were successful when I attempted to write to it
> directly but not via the failover sink processor. I did not see the warning
> that you mentioned about "Sink hdfs-sink1failed".
>
> The full log trace is below:
>
> 14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
> 14 Jan 2013 22:48:24,739 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:616)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>         at $Proxy11.create(Unknown Source)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>        at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:679)
>
>   ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* Rahul Ravindran <ra...@yahoo.com>
> *Cc:* "user@flume.apache.org" <us...@flume.apache.org>
> *Sent:* Monday, January 14, 2013 3:05 PM
>
> *Subject:* Re: HDFSsink failover error
>
> So you are able to write normally to the back-up HDFS servers? And that
> error you got was when you were trying to write to the normal server? Was
> it supposed to be an error (it looks like it's due to how your Hadoop is
> set up)?
>
> The log lines you pasted make it look like there was a problem with your
> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
> what should have happened is that the event was then written to the backup
> server. Below the stack trace there should probably have been another WARN
> statement saying "Sink hdfs-sink1 failed and has been sent to the failover
> list". And if hdfs-sink1-back then was unable to write, you would see a
> thrown EventDeliveryException in your log.
>
> If there isn't anything else in the log, and the event wasn't written to
> the backup server, then that would be a bug.
>
> - Connor
>
>
> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
>  Here is the full config. I swapped the priorities on the sink processor
> after performing the namenode failiver and the writes were successful to
> the newly active namenode.
>
> agent1.channels.ch1.type = FILE
> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
> agent1.channels.ch1.dataDirs = /flume_runtime/data
>
>
> agent1.channels.ch2.type = FILE
> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>
>
>
> # Define an Avro source called avro-source1 on agent1 and tell it
>
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
> agent1.sources.avro-source1.channels = ch1
> agent1.sources.avro-source1.type = avro
> agent1.sources.avro-source1.bind = 0.0.0.0
> agent1.sources.avro-source1.port = 4545
>
>
>
> agent1.sources.avro-source2.channels = ch2
> agent1.sources.avro-source2.type = avro
> agent1.sources.avro-source2.bind = 0.0.0.0
> agent1.sources.avro-source2.port = 4546
>
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2.channel = ch2
> agent1.sinks.hdfs-sink2.type = hdfs
> agent1.sinks.hdfs-sink2.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2-back.channel = ch2
> agent1.sinks.hdfs-sink2-back.type = hdfs
> agent1.sinks.hdfs-sink2-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>
>
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
> agent1.sinkgroups.failoverGroup2.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>
> # Finally, now that we've defined all of our components, tell
> # agent1 which ones we want to activate.
> agent1.sinkgroups = failoverGroup1 failoverGroup2
> agent1.channels = ch1 ch2
> agent1.sources = avro-source1 avro-source2
> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>
>    ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com>
> *Sent:* Monday, January 14, 2013 2:28 PM
> *Subject:* Re: HDFSsink failover error
>
> I assume that's only part of your config as it's missing a source; if you
> get rid of the sink processor, can you write to each hdfs sink
> individually? (comment one out at a time)
>
>  - Connor
>
>
> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
> Hi,
>    I am attempting to setup an HDFS sink such that when a namenode
> failover occurs (active namenode is brought down and the standby namenode
> switches to active), the failover sink would send events to the new active
> namenode. I see an error about WRITE not supported in standby state..Does
> this not count as a failure for the failover sink?
> Thanks,
> ~Rahul.
>
> My config is as follows:
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups = failoverGroup1
>
> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/....
> 14 Jan 2013 21:37:28,834 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:616)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>
>
>
>
>
>
>
>
>
>
>

Re: HDFSsink failover error

Posted by Rahul Ravindran <ra...@yahoo.com>.
Here is the entire log file after I restart flume


________________________________
 From: Connor Woodson <cw...@gmail.com>
To: "user@flume.apache.org" <us...@flume.apache.org>; Rahul Ravindran <ra...@yahoo.com> 
Sent: Monday, January 14, 2013 3:51 PM
Subject: Re: HDFSsink failover error
 

Can you look at the full log file and post the above section as well as 5-10 lines above/below it (you don't have to post that stack trace if you don't want)? Because that error, while it should definitely be logged, should be followed by some error lines giving context as to what is going on. And if that is the end of the log file then...well, that just shouldn't happen, as there are several different places that would have produced log messages as that exception propagates

- Connor



On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

The writes to the backup were successful when I attempted to write to it directly but not via the failover sink processor. I did not see the warning that you mentioned about "Sink hdfs-sink1failed". 
>
>
>The full log trace is below:
>
>
>14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1] (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
>14 Jan 2013 22:48:24,739 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby
>        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>
>        at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>        at $Proxy11.create(Unknown Source)
>        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:616)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>        at $Proxy11.create(Unknown Source)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>        at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>       at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>        at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>        at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>        at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>        at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>        at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>        at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>        at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>        at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>        at java.lang.Thread.run(Thread.java:679)
>
>
>
>________________________________
> From: Connor Woodson <cw...@gmail.com>
>To: Rahul Ravindran <ra...@yahoo.com> 
>Cc: "user@flume.apache.org" <us...@flume.apache.org> 
>Sent: Monday, January 14, 2013 3:05 PM
>
>Subject: Re: HDFSsink failover error
> 
>
>
>So you are able to write normally to the back-up HDFS servers? And that error you got was when you were trying to write to the normal server? Was it supposed to be an error (it looks like it's due to how your Hadoop is set up)?
>
>
>The log lines you pasted make it look like there was a problem with your hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong); what should have happened is that the event was then written to the backup server. Below the stack trace there should probably have been another WARN statement saying "Sink hdfs-sink1 failed and has been sent to the failover list". And if hdfs-sink1-back then was unable to write, you would see a thrown EventDeliveryException in your log.
>
>
>If there isn't anything else in the log, and the event wasn't written to the backup server, then that would be a bug.
>
>
>- Connor
>
>
>
>On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com> wrote:
>
>Here is the full config. I swapped the priorities on the sink processor after performing the namenode failiver and the writes were successful to the newly active namenode.
>>
>>
>>agent1.channels.ch1.type = FILE
>>agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
>>agent1.channels.ch1.dataDirs = /flume_runtime/data
>>
>>
>>
>>
>>agent1.channels.ch2.type = FILE
>>agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
>>agent1.channels.ch2.dataDirs = /flume_runtime/data2
>>
>>
>>
>>
>>
>>
>># Define an Avro source called avro-source1 on agent1 and tell it
>>
>>
>># to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>
>>
>>agent1.sources.avro-source1.channels = ch1
>>agent1.sources.avro-source1.type = avro
>>agent1.sources.avro-source1.bind = 0.0.0.0
>>agent1.sources.avro-source1.port = 4545
>>
>>
>>
>>
>>
>>
>>agent1.sources.avro-source2.channels = ch2
>>agent1.sources.avro-source2.type = avro
>>agent1.sources.avro-source2.bind = 0.0.0.0
>>agent1.sources.avro-source2.port = 4546
>>
>>
>>
>>
>>agent1.sinks.hdfs-sink1.channel = ch1
>>agent1.sinks.hdfs-sink1.type = hdfs
>>agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>>agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>>agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>>agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>>agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>>agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>>
>>agent1.sinks.hdfs-sink2.channel = ch2
>>agent1.sinks.hdfs-sink2.type = hdfs
>>agent1.sinks.hdfs-sink2.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
>>agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
>>agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>>agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
>>agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
>>agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>>agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>>agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
>>agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>>
>>
>>
>>
>>agent1.sinks.hdfs-sink1-back.channel = ch1
>>agent1.sinks.hdfs-sink1-back.type = hdfs
>>agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>>agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>>agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>>agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>>agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>>agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>>agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>>agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>>agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>>
>>agent1.sinks.hdfs-sink2-back.channel = ch2
>>agent1.sinks.hdfs-sink2-back.type = hdfs
>>agent1.sinks.hdfs-sink2-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
>>agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
>>agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
>>agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
>>agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
>>agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
>>agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
>>agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
>>agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>>
>>
>>
>>
>>
>>
>>agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>>agent1.sinkgroups.failoverGroup1.processor.type = failover
>>#higher number in priority is higher priority
>>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>>#failover if failure detected for 10 seconds
>>agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>>
>>
>>agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
>>agent1.sinkgroups.failoverGroup2.processor.type = failover
>>#higher number in priority is higher priority
>>agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
>>agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
>>#failover if failure detected for 10 seconds
>>agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>>
>>
>># Finally, now that we've defined all of our components, tell
>># agent1 which ones we want to activate.
>>agent1.sinkgroups = failoverGroup1 failoverGroup2
>>agent1.channels = ch1 ch2
>>agent1.sources = avro-source1 avro-source2
>>agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>>
>>
>>
>>________________________________
>> From: Connor Woodson <cw...@gmail.com>
>>To: user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com> 
>>Sent: Monday, January 14, 2013 2:28 PM
>>Subject: Re: HDFSsink failover error
>> 
>>
>>
>>I assume that's only part of your config as it's missing a source; if you get rid of the sink processor, can you write to each hdfs sink individually? (comment one out at a time)
>>
>>
>>- Connor
>>
>>
>>
>>On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com> wrote:
>>
>>Hi,
>>>   I am attempting to setup an HDFS sink such that when a namenode failover occurs (active namenode is brought down and the standby namenode switches to active), the failover sink would send events to the new active namenode. I see an error about WRITE not supported in standby state..Does this not count as a failure for the failover sink?
>>>Thanks,
>>>~Rahul.
>>>
>>>
>>>My config is as follows:
>>>
>>>
>>>agent1.sinks.hdfs-sink1.channel = ch1
>>>agent1.sinks.hdfs-sink1.type = hdfs
>>>agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>>>agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>>>agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>>agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>>>agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>>>agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>>agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>>agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>>>agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>>
>>>
>>>agent1.sinks.hdfs-sink1-back.channel = ch1
>>>agent1.sinks.hdfs-sink1-back.type = hdfs
>>>agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>>>agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>>>agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>>>agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>>>agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>>>agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>>>agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>>>agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>>>agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>>
>>>
>>>agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>>>agent1.sinkgroups.failoverGroup1.processor.type = failover
>>>#higher number in priority is higher priority
>>>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>>>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>>>#failover if failure detected for 10 seconds
>>>agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>>
>>>
>>>
>>>
>>>agent1.sinkgroups = failoverGroup1
>>>
>>>
>>>
>>>14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6] (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating hdfs://ip-10-4-71-187.ec2.internal/....
>>>14 Jan 2013 21:37:28,834 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>>>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby
>>>        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>>        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>>        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>>        at java.security.AccessController.doPrivileged(Native Method)
>>>        at javax.security.auth.Subject.doAs(Subject.java:396)
>>>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>>
>>>
>>>        at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>>        at $Proxy11.create(Unknown Source)
>>>        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>        at java.lang.reflect.Method.invoke(Method.java:616)
>>>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>>
>>
>>
>
>
>

Re: HDFSsink failover error

Posted by Connor Woodson <cw...@gmail.com>.
Can you look at the full log file and post the above section as well as
5-10 lines above/below it (you don't have to post that stack trace if you
don't want)? Because that error, while it should definitely be logged,
should be followed by some error lines giving context as to what is going
on. And if that is the end of the log file then...well, that just shouldn't
happen, as there are several different places that would have produced log
messages as that exception propagates

- Connor

On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

>  The writes to the backup were successful when I attempted to write to it
> directly but not via the failover sink processor. I did not see the warning
> that you mentioned about "Sink hdfs-sink1failed".
>
> The full log trace is below:
>
> 14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
> 14 Jan 2013 22:48:24,739 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:616)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>         at $Proxy11.create(Unknown Source)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>        at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:679)
>
>   ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* Rahul Ravindran <ra...@yahoo.com>
> *Cc:* "user@flume.apache.org" <us...@flume.apache.org>
> *Sent:* Monday, January 14, 2013 3:05 PM
>
> *Subject:* Re: HDFSsink failover error
>
> So you are able to write normally to the back-up HDFS servers? And that
> error you got was when you were trying to write to the normal server? Was
> it supposed to be an error (it looks like it's due to how your Hadoop is
> set up)?
>
> The log lines you pasted make it look like there was a problem with your
> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
> what should have happened is that the event was then written to the backup
> server. Below the stack trace there should probably have been another WARN
> statement saying "Sink hdfs-sink1 failed and has been sent to the failover
> list". And if hdfs-sink1-back then was unable to write, you would see a
> thrown EventDeliveryException in your log.
>
> If there isn't anything else in the log, and the event wasn't written to
> the backup server, then that would be a bug.
>
> - Connor
>
>
> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
>  Here is the full config. I swapped the priorities on the sink processor
> after performing the namenode failiver and the writes were successful to
> the newly active namenode.
>
> agent1.channels.ch1.type = FILE
> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
> agent1.channels.ch1.dataDirs = /flume_runtime/data
>
>
> agent1.channels.ch2.type = FILE
> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>
>
>
> # Define an Avro source called avro-source1 on agent1 and tell it
>
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
> agent1.sources.avro-source1.channels = ch1
> agent1.sources.avro-source1.type = avro
> agent1.sources.avro-source1.bind = 0.0.0.0
> agent1.sources.avro-source1.port = 4545
>
>
>
> agent1.sources.avro-source2.channels = ch2
> agent1.sources.avro-source2.type = avro
> agent1.sources.avro-source2.bind = 0.0.0.0
> agent1.sources.avro-source2.port = 4546
>
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2.channel = ch2
> agent1.sinks.hdfs-sink2.type = hdfs
> agent1.sinks.hdfs-sink2.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2-back.channel = ch2
> agent1.sinks.hdfs-sink2-back.type = hdfs
> agent1.sinks.hdfs-sink2-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>
>
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
> agent1.sinkgroups.failoverGroup2.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>
> # Finally, now that we've defined all of our components, tell
> # agent1 which ones we want to activate.
> agent1.sinkgroups = failoverGroup1 failoverGroup2
> agent1.channels = ch1 ch2
> agent1.sources = avro-source1 avro-source2
> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>
>    ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com>
> *Sent:* Monday, January 14, 2013 2:28 PM
> *Subject:* Re: HDFSsink failover error
>
> I assume that's only part of your config as it's missing a source; if you
> get rid of the sink processor, can you write to each hdfs sink
> individually? (comment one out at a time)
>
>  - Connor
>
>
> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
> Hi,
>    I am attempting to setup an HDFS sink such that when a namenode
> failover occurs (active namenode is brought down and the standby namenode
> switches to active), the failover sink would send events to the new active
> namenode. I see an error about WRITE not supported in standby state..Does
> this not count as a failure for the failover sink?
> Thanks,
> ~Rahul.
>
> My config is as follows:
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups = failoverGroup1
>
> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/....
> 14 Jan 2013 21:37:28,834 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:616)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>
>
>
>
>
>
>
>

Re: HDFSsink failover error

Posted by Rahul Ravindran <ra...@yahoo.com>.
The writes to the backup were successful when I attempted to write to it directly but not via the failover sink processor. I did not see the warning that you mentioned about "Sink hdfs-sink1failed". 

The full log trace is below:

14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1] (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
14 Jan 2013 22:48:24,739 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby
        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)

        at org.apache.hadoop.ipc.Client.call(Client.java:1160)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
        at $Proxy11.create(Unknown Source)
        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:616)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        at $Proxy11.create(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
        at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
       at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
        at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
        at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
        at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
        at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
        at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
        at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
        at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
        at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)


________________________________
 From: Connor Woodson <cw...@gmail.com>
To: Rahul Ravindran <ra...@yahoo.com> 
Cc: "user@flume.apache.org" <us...@flume.apache.org> 
Sent: Monday, January 14, 2013 3:05 PM
Subject: Re: HDFSsink failover error
 

So you are able to write normally to the back-up HDFS servers? And that error you got was when you were trying to write to the normal server? Was it supposed to be an error (it looks like it's due to how your Hadoop is set up)?

The log lines you pasted make it look like there was a problem with your hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong); what should have happened is that the event was then written to the backup server. Below the stack trace there should probably have been another WARN statement saying "Sink hdfs-sink1 failed and has been sent to the failover list". And if hdfs-sink1-back then was unable to write, you would see a thrown EventDeliveryException in your log.

If there isn't anything else in the log, and the event wasn't written to the backup server, then that would be a bug.

- Connor



On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

Here is the full config. I swapped the priorities on the sink processor after performing the namenode failiver and the writes were successful to the newly active namenode.
>
>
>agent1.channels.ch1.type = FILE
>agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
>agent1.channels.ch1.dataDirs = /flume_runtime/data
>
>
>
>
>agent1.channels.ch2.type = FILE
>agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
>agent1.channels.ch2.dataDirs = /flume_runtime/data2
>
>
>
>
>
>
># Define an Avro source called avro-source1 on agent1 and tell it
>
>
># to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
>
>agent1.sources.avro-source1.channels = ch1
>agent1.sources.avro-source1.type = avro
>agent1.sources.avro-source1.bind = 0.0.0.0
>agent1.sources.avro-source1.port = 4545
>
>
>
>
>
>
>agent1.sources.avro-source2.channels = ch2
>agent1.sources.avro-source2.type = avro
>agent1.sources.avro-source2.bind = 0.0.0.0
>agent1.sources.avro-source2.port = 4546
>
>
>
>
>agent1.sinks.hdfs-sink1.channel = ch1
>agent1.sinks.hdfs-sink1.type = hdfs
>agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
>
>agent1.sinks.hdfs-sink2.channel = ch2
>agent1.sinks.hdfs-sink2.type = hdfs
>agent1.sinks.hdfs-sink2.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
>agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
>agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
>agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
>agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
>agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>
>
>
>
>agent1.sinks.hdfs-sink1-back.channel = ch1
>agent1.sinks.hdfs-sink1-back.type = hdfs
>agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
>
>agent1.sinks.hdfs-sink2-back.channel = ch2
>agent1.sinks.hdfs-sink2-back.type = hdfs
>agent1.sinks.hdfs-sink2-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
>agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
>agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
>agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
>agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
>agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
>agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
>agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
>agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>
>
>
>
>
>
>agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>agent1.sinkgroups.failoverGroup1.processor.type = failover
>#higher number in priority is higher priority
>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>#failover if failure detected for 10 seconds
>agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
>
>
>agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
>agent1.sinkgroups.failoverGroup2.processor.type = failover
>#higher number in priority is higher priority
>agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
>agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
>#failover if failure detected for 10 seconds
>agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>
>
># Finally, now that we've defined all of our components, tell
># agent1 which ones we want to activate.
>agent1.sinkgroups = failoverGroup1 failoverGroup2
>agent1.channels = ch1 ch2
>agent1.sources = avro-source1 avro-source2
>agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>
>
>
>________________________________
> From: Connor Woodson <cw...@gmail.com>
>To: user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com> 
>Sent: Monday, January 14, 2013 2:28 PM
>Subject: Re: HDFSsink failover error
> 
>
>
>I assume that's only part of your config as it's missing a source; if you get rid of the sink processor, can you write to each hdfs sink individually? (comment one out at a time)
>
>
>- Connor
>
>
>
>On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com> wrote:
>
>Hi,
>>   I am attempting to setup an HDFS sink such that when a namenode failover occurs (active namenode is brought down and the standby namenode switches to active), the failover sink would send events to the new active namenode. I see an error about WRITE not supported in standby state..Does this not count as a failure for the failover sink?
>>Thanks,
>>~Rahul.
>>
>>
>>My config is as follows:
>>
>>
>>agent1.sinks.hdfs-sink1.channel = ch1
>>agent1.sinks.hdfs-sink1.type = hdfs
>>agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>>agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>>agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>>agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>>agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>>agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>>
>>agent1.sinks.hdfs-sink1-back.channel = ch1
>>agent1.sinks.hdfs-sink1-back.type = hdfs
>>agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>>agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>>agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>>agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>>agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>>agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>>agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>>agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>>agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>>
>>agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>>agent1.sinkgroups.failoverGroup1.processor.type = failover
>>#higher number in priority is higher priority
>>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>>#failover if failure detected for 10 seconds
>>agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>>
>>
>>agent1.sinkgroups = failoverGroup1
>>
>>
>>
>>14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6] (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating hdfs://ip-10-4-71-187.ec2.internal/....
>>14 Jan 2013 21:37:28,834 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby
>>        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>        at java.security.AccessController.doPrivileged(Native Method)
>>        at javax.security.auth.Subject.doAs(Subject.java:396)
>>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>
>>        at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>        at $Proxy11.create(Unknown Source)
>>        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>        at java.lang.reflect.Method.invoke(Method.java:616)
>>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>
>
>

Re: HDFSsink failover error

Posted by Connor Woodson <cw...@gmail.com>.
So you are able to write normally to the back-up HDFS servers? And that
error you got was when you were trying to write to the normal server? Was
it supposed to be an error (it looks like it's due to how your Hadoop is
set up)?

The log lines you pasted make it look like there was a problem with your
hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
what should have happened is that the event was then written to the backup
server. Below the stack trace there should probably have been another WARN
statement saying "Sink hdfs-sink1 failed and has been sent to the failover
list". And if hdfs-sink1-back then was unable to write, you would see a
thrown EventDeliveryException in your log.

If there isn't anything else in the log, and the event wasn't written to
the backup server, then that would be a bug.

- Connor


On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

> Here is the full config. I swapped the priorities on the sink processor
> after performing the namenode failiver and the writes were successful to
> the newly active namenode.
>
> agent1.channels.ch1.type = FILE
> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
> agent1.channels.ch1.dataDirs = /flume_runtime/data
>
>
> agent1.channels.ch2.type = FILE
> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>
>
>
> # Define an Avro source called avro-source1 on agent1 and tell it
>
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
> agent1.sources.avro-source1.channels = ch1
> agent1.sources.avro-source1.type = avro
> agent1.sources.avro-source1.bind = 0.0.0.0
> agent1.sources.avro-source1.port = 4545
>
>
>
> agent1.sources.avro-source2.channels = ch2
> agent1.sources.avro-source2.type = avro
> agent1.sources.avro-source2.bind = 0.0.0.0
> agent1.sources.avro-source2.port = 4546
>
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2.channel = ch2
> agent1.sinks.hdfs-sink2.type = hdfs
> agent1.sinks.hdfs-sink2.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink2-back.channel = ch2
> agent1.sinks.hdfs-sink2-back.type = hdfs
> agent1.sinks.hdfs-sink2-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>
>
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
> agent1.sinkgroups.failoverGroup2.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>
> # Finally, now that we've defined all of our components, tell
> # agent1 which ones we want to activate.
> agent1.sinkgroups = failoverGroup1 failoverGroup2
> agent1.channels = ch1 ch2
> agent1.sources = avro-source1 avro-source2
> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>
>   ------------------------------
> *From:* Connor Woodson <cw...@gmail.com>
> *To:* user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com>
> *Sent:* Monday, January 14, 2013 2:28 PM
> *Subject:* Re: HDFSsink failover error
>
> I assume that's only part of your config as it's missing a source; if you
> get rid of the sink processor, can you write to each hdfs sink
> individually? (comment one out at a time)
>
> - Connor
>
>
> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com>wrote:
>
> Hi,
>    I am attempting to setup an HDFS sink such that when a namenode
> failover occurs (active namenode is brought down and the standby namenode
> switches to active), the failover sink would send events to the new active
> namenode. I see an error about WRITE not supported in standby state..Does
> this not count as a failure for the failover sink?
> Thanks,
> ~Rahul.
>
> My config is as follows:
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups = failoverGroup1
>
> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/....
> 14 Jan 2013 21:37:28,834 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:616)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>
>
>
>
>

Re: HDFSsink failover error

Posted by Rahul Ravindran <ra...@yahoo.com>.
Here is the full config. I swapped the priorities on the sink processor after performing the namenode failiver and the writes were successful to the newly active namenode.

agent1.channels.ch1.type = FILE
agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
agent1.channels.ch1.dataDirs = /flume_runtime/data


agent1.channels.ch2.type = FILE
agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
agent1.channels.ch2.dataDirs = /flume_runtime/data2



# Define an Avro source called avro-source1 on agent1 and tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 4545



agent1.sources.avro-source2.channels = ch2
agent1.sources.avro-source2.type = avro
agent1.sources.avro-source2.bind = 0.0.0.0
agent1.sources.avro-source2.port = 4546


agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000

agent1.sinks.hdfs-sink2.channel = ch2
agent1.sinks.hdfs-sink2.type = hdfs
agent1.sinks.hdfs-sink2.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000


agent1.sinks.hdfs-sink1-back.channel = ch1
agent1.sinks.hdfs-sink1-back.type = hdfs
agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000

agent1.sinks.hdfs-sink2-back.channel = ch2
agent1.sinks.hdfs-sink2-back.type = hdfs
agent1.sinks.hdfs-sink2-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000



agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
agent1.sinkgroups.failoverGroup1.processor.type = failover
#higher number in priority is higher priority
agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
#failover if failure detected for 10 seconds
agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000


agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
agent1.sinkgroups.failoverGroup2.processor.type = failover
#higher number in priority is higher priority
agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
#failover if failure detected for 10 seconds
agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.sinkgroups = failoverGroup1 failoverGroup2
agent1.channels = ch1 ch2
agent1.sources = avro-source1 avro-source2
agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back


________________________________
 From: Connor Woodson <cw...@gmail.com>
To: user@flume.apache.org; Rahul Ravindran <ra...@yahoo.com> 
Sent: Monday, January 14, 2013 2:28 PM
Subject: Re: HDFSsink failover error
 

I assume that's only part of your config as it's missing a source; if you get rid of the sink processor, can you write to each hdfs sink individually? (comment one out at a time)

- Connor



On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

Hi,
>   I am attempting to setup an HDFS sink such that when a namenode failover occurs (active namenode is brought down and the standby namenode switches to active), the failover sink would send events to the new active namenode. I see an error about WRITE not supported in standby state..Does this not count as a failure for the failover sink?
>Thanks,
>~Rahul.
>
>
>My config is as follows:
>
>
>agent1.sinks.hdfs-sink1.channel = ch1
>agent1.sinks.hdfs-sink1.type = hdfs
>agent1.sinks.hdfs-sink1.hdfs.path = hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
>
>agent1.sinks.hdfs-sink1-back.channel = ch1
>agent1.sinks.hdfs-sink1-back.type = hdfs
>agent1.sinks.hdfs-sink1-back.hdfs.path = hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
>
>agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>agent1.sinkgroups.failoverGroup1.processor.type = failover
>#higher number in priority is higher priority
>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>#failover if failure detected for 10 seconds
>agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
>
>
>agent1.sinkgroups = failoverGroup1
>
>
>
>14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6] (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating hdfs://ip-10-4-71-187.ec2.internal/....
>14 Jan 2013 21:37:28,834 WARN  [SinkRunner-PollingRunner-FailoverSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category WRITE is not supported in state standby
>        at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>        at org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>
>        at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>        at $Proxy11.create(Unknown Source)
>        at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>        at java.lang.reflect.Method.invoke(Method.java:616)
>        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho

Re: HDFSsink failover error

Posted by Connor Woodson <cw...@gmail.com>.
I assume that's only part of your config as it's missing a source; if you
get rid of the sink processor, can you write to each hdfs sink
individually? (comment one out at a time)

- Connor


On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <ra...@yahoo.com> wrote:

> Hi,
>    I am attempting to setup an HDFS sink such that when a namenode
> failover occurs (active namenode is brought down and the standby namenode
> switches to active), the failover sink would send events to the new active
> namenode. I see an error about WRITE not supported in standby state..Does
> this not count as a failure for the failover sink?
> Thanks,
> ~Rahul.
>
> My config is as follows:
>
> agent1.sinks.hdfs-sink1.channel = ch1
> agent1.sinks.hdfs-sink1.type = hdfs
> agent1.sinks.hdfs-sink1.hdfs.path =
> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>
> agent1.sinks.hdfs-sink1-back.channel = ch1
> agent1.sinks.hdfs-sink1-back.type = hdfs
> agent1.sinks.hdfs-sink1-back.hdfs.path =
> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>
> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
> agent1.sinkgroups.failoverGroup1.processor.type = failover
> #higher number in priority is higher priority
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
> #failover if failure detected for 10 seconds
> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>
>
> agent1.sinkgroups = failoverGroup1
>
> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
> hdfs://ip-10-4-71-187.ec2.internal/....
> 14 Jan 2013 21:37:28,834 WARN
>  [SinkRunner-PollingRunner-FailoverSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby
>         at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>         at $Proxy11.create(Unknown Source)
>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:616)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>