You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Bijoy Deb <bi...@gmail.com> on 2015/08/06 11:08:28 UTC

Flume issue: Copying the same source file multiple times with different timestamps in case of HDFS IO error

Hi,

I have a Flume process that transfers multiple files (around 10 files of
400GB each) per day from a specific source directory to an HDFS sink.I am
facing an issue when there is an HDFS IO error while Flume is in the
process of copying the files from source to sink.The issue is that Flume is
copying the same file twice to sink,with 2 different timestamps,resulting
in duplication of data in my downstream processes,which is not what I want.

Can anyone kindly let me know if this is a known issue with Flume,and if
yes,is there any workaround to this?

Relevant details:
Flume version: 1.3.1
1. Source/Spool dir File location: /test/part1/2015072110_layer2_1.gz

2. HDFS sink/destination: hdfs:///staging/test/

3. Files dumped in sink by Flume:
/staging/test/2015072110_layer2_1.1437634754144.gz
/staging/test/2015072110_layer2_1.1437634754145.gz

4. Flume agent logs:

(SpoolingFileReader.java:170)] File is
processed....************************/test/part1/2015072110_layer2_1.gz
2015-07-23 02:59:09,392 (pool-14-thread-1) [INFO -
com.flume.spool.zip.SpoolingFileReader.retireCurrentFile(SpoolingFileReader.java:270)]
Preparing to move file /test/part1/2015072110_layer2_1.gz to
/test/part1/2015072110_layer2_1.gz.COMPLETED
2015-07-23 02:59:09,395 (pool-14-thread-1) [INFO -
com.flume.spool.zip.SpoolingFileReader.readEvents(SpoolingFileReader.java:176)]
flag was set as true
2015-07-23 02:59:14,808 (hdfs-c1s1-call-runner-8) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)]
Creating /staging/test/2015072110_layer2_1.1437634754144.gz.tmp
2015-07-23 02:59:32,144 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[WARN -
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
HDFS IO error
java.io.IOException: Callable timed out after 18000 ms
    at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:352)
    at
org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
    at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
    at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:853)
Caused by: java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:212)
    at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:345)
    ... 5 more
2015-07-23 02:59:37,269 (hdfs-c1s1-call-runner-9) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)]
Renaming /staging/test/2015072110_layer2_1.1437634754144.gz.tmp to
/staging/test/2015072110_layer2_1.1437634754144.gz
2015-07-23 02:59:38,513 (hdfs-c1s1-call-runner-9) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)]
Creating /staging/test/2015072110_layer2_1.1437634754145.gz.tmp
2015-07-23 02:59:56,333 (hdfs-c1s1-roll-timer-0) [INFO -
org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:322)]
Closing idle bucketWriter /staging/test/2015072110_layer2_1
2015-07-23 02:59:56,340 (hdfs-c1s1-roll-timer-0) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)]
Renaming /staging/test/2015072110_layer2_1.1437634754145.gz.tmp to
/staging/test/2015072110_layer2_1.1437634754145.gz

Thanks,
Bijoy

Re: Flume issue: Copying the same source file multiple times with different timestamps in case of HDFS IO error

Posted by iain wright <ia...@gmail.com>.
Personally i would never want flume to have the capability to delete
anything from hdfs.

I'm pretty sure flumes at least once delivery was a design decision from
the beginning, maybe a dev can chime in on the complexity of exactly once
delivery ( disclaimer: i am just a user )

We dedup in spark before running analytics/reports on our data, maybe
something similar is possible for you.

Just curious, what rollsize/count/etc. is configured in your sink? hdfs
cluster specs? metal or cloud provider based?

Cheers,



-- 
Iain Wright

This email message is confidential, intended only for the recipient(s)
named above and may contain information that is privileged, exempt from
disclosure under applicable law. If you are not the intended recipient, do
not disclose or disseminate the message to anyone except the intended
recipient. If you have received this message in error, or are not the named
recipient(s), please immediately notify the sender by return email, and
delete all copies of this message.

On Thu, Aug 6, 2015 at 7:05 AM, Bijoy Deb <bi...@gmail.com> wrote:

> Hi AnandKumar,
>
> Thanks for the suggestion.We will try it out on our PROD environment.
>
> But shouldn't Flume be deleting any older files with same name before
> creating a new one.Looks like a bug to me.We can increase the timeout and
> resolve the issue,but what if someday that timeout value also is not
> sufficient and Flume still creates duplicate files?
>
> Anyone aware of any issue logged for Flume on this?
>
> Thanks,
> Bijoy
>
> On Thu, Aug 6, 2015 at 3:18 PM, Anandkumar Lakshmanan <an...@orzota.com>
> wrote:
>
>>
>> Hi Bijoy,
>>
>> Because of short callTimeout, where the HDFS cluster does not complete
>> the call in the time for which the HDFS sink in Flume waits for the call to
>> complete.
>> So Flume retries the entire transaction, and events that were written as
>> part of the previous failed transaction, are again written to HDFS as part
>> of the retried transaction.
>> Increase the timeout value, as I use 150000 in production env.
>>
>>
>> Thanks
>> Anand.
>>
>>
>> On 08/06/2015 02:38 PM, Bijoy Deb wrote:
>>
>>> Hi,
>>>
>>> I have a Flume process that transfers multiple files (around 10 files of
>>> 400GB each) per day from a specific source directory to an HDFS sink.I am
>>> facing an issue when there is an HDFS IO error while Flume is in the
>>> process of copying the files from source to sink.The issue is that Flume is
>>> copying the same file twice to sink,with 2 different timestamps,resulting
>>> in duplication of data in my downstream processes,which is not what I want.
>>>
>>> Can anyone kindly let me know if this is a known issue with Flume,and if
>>> yes,is there any workaround to this?
>>>
>>> Relevant details:
>>> Flume version: 1.3.1
>>> 1. Source/Spool dir File location: /test/part1/2015072110_layer2_1.gz
>>>
>>> 2. HDFS sink/destination: hdfs:///staging/test/
>>>
>>> 3. Files dumped in sink by Flume:
>>> /staging/test/2015072110_layer2_1.1437634754144.gz
>>> /staging/test/2015072110_layer2_1.1437634754145.gz
>>>
>>> 4. Flume agent logs:
>>>
>>> (SpoolingFileReader.java:170)] File is
>>> processed....************************/test/part1/2015072110_layer2_1.gz
>>> 2015-07-23 02:59:09,392 (pool-14-thread-1) [INFO -
>>> com.flume.spool.zip.SpoolingFileReader.retireCurrentFile(SpoolingFileReader.java:270)]
>>> Preparing to move file /test/part1/2015072110_layer2_1.gz to
>>> /test/part1/2015072110_layer2_1.gz.COMPLETED
>>> 2015-07-23 02:59:09,395 (pool-14-thread-1) [INFO -
>>> com.flume.spool.zip.SpoolingFileReader.readEvents(SpoolingFileReader.java:176)]
>>> flag was set as true
>>> 2015-07-23 02:59:14,808 (hdfs-c1s1-call-runner-8) [INFO -
>>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)]
>>> Creating /staging/test/2015072110_layer2_1.1437634754144.gz.tmp
>>> 2015-07-23 02:59:32,144 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [WARN -
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
>>> HDFS IO error
>>> java.io.IOException: Callable timed out after 18000 ms
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:352)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>     at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>     at java.lang.Thread.run(Thread.java:853)
>>> Caused by: java.util.concurrent.TimeoutException
>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:212)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:345)
>>>     ... 5 more
>>> 2015-07-23 02:59:37,269 (hdfs-c1s1-call-runner-9) [INFO -
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)]
>>> Renaming /staging/test/2015072110_layer2_1.1437634754144.gz.tmp to
>>> /staging/test/2015072110_layer2_1.1437634754144.gz
>>> 2015-07-23 02:59:38,513 (hdfs-c1s1-call-runner-9) [INFO -
>>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)]
>>> Creating /staging/test/2015072110_layer2_1.1437634754145.gz.tmp
>>> 2015-07-23 02:59:56,333 (hdfs-c1s1-roll-timer-0) [INFO -
>>> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:322)]
>>> Closing idle bucketWriter /staging/test/2015072110_layer2_1
>>> 2015-07-23 02:59:56,340 (hdfs-c1s1-roll-timer-0) [INFO -
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)]
>>> Renaming /staging/test/2015072110_layer2_1.1437634754145.gz.tmp to
>>> /staging/test/2015072110_layer2_1.1437634754145.gz
>>>
>>> Thanks,
>>> Bijoy
>>>
>>
>>
>

Re: Flume issue: Copying the same source file multiple times with different timestamps in case of HDFS IO error

Posted by Bijoy Deb <bi...@gmail.com>.
Hi AnandKumar,

Thanks for the suggestion.We will try it out on our PROD environment.

But shouldn't Flume be deleting any older files with same name before
creating a new one.Looks like a bug to me.We can increase the timeout and
resolve the issue,but what if someday that timeout value also is not
sufficient and Flume still creates duplicate files?

Anyone aware of any issue logged for Flume on this?

Thanks,
Bijoy

On Thu, Aug 6, 2015 at 3:18 PM, Anandkumar Lakshmanan <an...@orzota.com>
wrote:

>
> Hi Bijoy,
>
> Because of short callTimeout, where the HDFS cluster does not complete the
> call in the time for which the HDFS sink in Flume waits for the call to
> complete.
> So Flume retries the entire transaction, and events that were written as
> part of the previous failed transaction, are again written to HDFS as part
> of the retried transaction.
> Increase the timeout value, as I use 150000 in production env.
>
>
> Thanks
> Anand.
>
>
> On 08/06/2015 02:38 PM, Bijoy Deb wrote:
>
>> Hi,
>>
>> I have a Flume process that transfers multiple files (around 10 files of
>> 400GB each) per day from a specific source directory to an HDFS sink.I am
>> facing an issue when there is an HDFS IO error while Flume is in the
>> process of copying the files from source to sink.The issue is that Flume is
>> copying the same file twice to sink,with 2 different timestamps,resulting
>> in duplication of data in my downstream processes,which is not what I want.
>>
>> Can anyone kindly let me know if this is a known issue with Flume,and if
>> yes,is there any workaround to this?
>>
>> Relevant details:
>> Flume version: 1.3.1
>> 1. Source/Spool dir File location: /test/part1/2015072110_layer2_1.gz
>>
>> 2. HDFS sink/destination: hdfs:///staging/test/
>>
>> 3. Files dumped in sink by Flume:
>> /staging/test/2015072110_layer2_1.1437634754144.gz
>> /staging/test/2015072110_layer2_1.1437634754145.gz
>>
>> 4. Flume agent logs:
>>
>> (SpoolingFileReader.java:170)] File is
>> processed....************************/test/part1/2015072110_layer2_1.gz
>> 2015-07-23 02:59:09,392 (pool-14-thread-1) [INFO -
>> com.flume.spool.zip.SpoolingFileReader.retireCurrentFile(SpoolingFileReader.java:270)]
>> Preparing to move file /test/part1/2015072110_layer2_1.gz to
>> /test/part1/2015072110_layer2_1.gz.COMPLETED
>> 2015-07-23 02:59:09,395 (pool-14-thread-1) [INFO -
>> com.flume.spool.zip.SpoolingFileReader.readEvents(SpoolingFileReader.java:176)]
>> flag was set as true
>> 2015-07-23 02:59:14,808 (hdfs-c1s1-call-runner-8) [INFO -
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)]
>> Creating /staging/test/2015072110_layer2_1.1437634754144.gz.tmp
>> 2015-07-23 02:59:32,144 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [WARN -
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
>> HDFS IO error
>> java.io.IOException: Callable timed out after 18000 ms
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:352)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>     at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>     at java.lang.Thread.run(Thread.java:853)
>> Caused by: java.util.concurrent.TimeoutException
>>     at java.util.concurrent.FutureTask.get(FutureTask.java:212)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:345)
>>     ... 5 more
>> 2015-07-23 02:59:37,269 (hdfs-c1s1-call-runner-9) [INFO -
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)]
>> Renaming /staging/test/2015072110_layer2_1.1437634754144.gz.tmp to
>> /staging/test/2015072110_layer2_1.1437634754144.gz
>> 2015-07-23 02:59:38,513 (hdfs-c1s1-call-runner-9) [INFO -
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)]
>> Creating /staging/test/2015072110_layer2_1.1437634754145.gz.tmp
>> 2015-07-23 02:59:56,333 (hdfs-c1s1-roll-timer-0) [INFO -
>> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:322)]
>> Closing idle bucketWriter /staging/test/2015072110_layer2_1
>> 2015-07-23 02:59:56,340 (hdfs-c1s1-roll-timer-0) [INFO -
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)]
>> Renaming /staging/test/2015072110_layer2_1.1437634754145.gz.tmp to
>> /staging/test/2015072110_layer2_1.1437634754145.gz
>>
>> Thanks,
>> Bijoy
>>
>
>

Re: Flume issue: Copying the same source file multiple times with different timestamps in case of HDFS IO error

Posted by Anandkumar Lakshmanan <an...@orzota.com>.
Hi Bijoy,

Because of short callTimeout, where the HDFS cluster does not complete 
the call in the time for which the HDFS sink in Flume waits for the call 
to complete.
So Flume retries the entire transaction, and events that were written as 
part of the previous failed transaction, are again written to HDFS as 
part of the retried transaction.
Increase the timeout value, as I use 150000 in production env.


Thanks
Anand.

On 08/06/2015 02:38 PM, Bijoy Deb wrote:
> Hi,
>
> I have a Flume process that transfers multiple files (around 10 files 
> of 400GB each) per day from a specific source directory to an HDFS 
> sink.I am facing an issue when there is an HDFS IO error while Flume 
> is in the process of copying the files from source to sink.The issue 
> is that Flume is copying the same file twice to sink,with 2 different 
> timestamps,resulting in duplication of data in my downstream 
> processes,which is not what I want.
>
> Can anyone kindly let me know if this is a known issue with Flume,and 
> if yes,is there any workaround to this?
>
> Relevant details:
> Flume version: 1.3.1
> 1. Source/Spool dir File location: /test/part1/2015072110_layer2_1.gz
>
> 2. HDFS sink/destination: hdfs:///staging/test/
>
> 3. Files dumped in sink by Flume:
> /staging/test/2015072110_layer2_1.1437634754144.gz
> /staging/test/2015072110_layer2_1.1437634754145.gz
>
> 4. Flume agent logs:
>
> (SpoolingFileReader.java:170)] File is 
> processed....************************/test/part1/2015072110_layer2_1.gz
> 2015-07-23 02:59:09,392 (pool-14-thread-1) [INFO - 
> com.flume.spool.zip.SpoolingFileReader.retireCurrentFile(SpoolingFileReader.java:270)] 
> Preparing to move file /test/part1/2015072110_layer2_1.gz to 
> /test/part1/2015072110_layer2_1.gz.COMPLETED
> 2015-07-23 02:59:09,395 (pool-14-thread-1) [INFO - 
> com.flume.spool.zip.SpoolingFileReader.readEvents(SpoolingFileReader.java:176)] 
> flag was set as true
> 2015-07-23 02:59:14,808 (hdfs-c1s1-call-runner-8) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] 
> Creating /staging/test/2015072110_layer2_1.1437634754144.gz.tmp
> 2015-07-23 02:59:32,144 
> (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)] 
> HDFS IO error
> java.io.IOException: Callable timed out after 18000 ms
>     at 
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:352)
>     at 
> org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
>     at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>     at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>     at java.lang.Thread.run(Thread.java:853)
> Caused by: java.util.concurrent.TimeoutException
>     at java.util.concurrent.FutureTask.get(FutureTask.java:212)
>     at 
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:345)
>     ... 5 more
> 2015-07-23 02:59:37,269 (hdfs-c1s1-call-runner-9) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] 
> Renaming /staging/test/2015072110_layer2_1.1437634754144.gz.tmp to 
> /staging/test/2015072110_layer2_1.1437634754144.gz
> 2015-07-23 02:59:38,513 (hdfs-c1s1-call-runner-9) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] 
> Creating /staging/test/2015072110_layer2_1.1437634754145.gz.tmp
> 2015-07-23 02:59:56,333 (hdfs-c1s1-roll-timer-0) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter$5.call(BucketWriter.java:322)] 
> Closing idle bucketWriter /staging/test/2015072110_layer2_1
> 2015-07-23 02:59:56,340 (hdfs-c1s1-roll-timer-0) [INFO - 
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] 
> Renaming /staging/test/2015072110_layer2_1.1437634754145.gz.tmp to 
> /staging/test/2015072110_layer2_1.1437634754145.gz
>
> Thanks,
> Bijoy