You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by 周梦想 <ab...@gmail.com> on 2013/02/27 07:49:25 UTC

windows spooldir source problem

Hello,
I have a question using spooldir source.

If I have a large file such as more than 100MB, when I copy this file to
spooldir, the flume agent will find it immediately and begin send data to
another agent. If flume reading speed is faster than OS writing speed, it
will reach end of file and begin change file name,but OS still write data
to it. How to process this condition?

I encountered this situation in windows. a thread change file to .fin,
report permission error. another thread change file name ok. and then this
this thread compain "java.io.IOException: Stream closed" again and again.

What should I do?

below is the error log from windows.
Thanks,
Andy

27 Feb 2013 12:00:05,310 INFO  [pool-5-thread-1]
(org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile:229)
 - Preparing to move file
D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin
27 Feb 2013 12:00:05,310 ERROR [pool-5-thread-1]
(org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
 - Uncaught exception in Runnable
org.apache.flume.FlumeException: Unable to move
D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin. This
will likely cause duplicate events. Please verify that flume has sufficient
permissions to perform these operations.
 at
org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:282)
at
org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185)
 at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
 at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
27 Feb 2013 12:00:05,825 ERROR [pool-5-thread-1]
(org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
 - Uncaught exception in Runnable
java.io.IOException: Stream closed
 at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
at java.io.BufferedReader.readLine(BufferedReader.java:292)
 at java.io.BufferedReader.readLine(BufferedReader.java:362)
at
org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180)
 at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
 at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)

Re: windows spooldir source problem

Posted by 周梦想 <ab...@gmail.com>.
yes,we using windows cmd mv command to move the file and set it to the task
schedule.

I add a patch for windows flume to check if the file is writing :
flume-ng 1.3.1 SpoolingFileLineReader.java +319
      File nextFile = candidateFiles.get(0);
      try {
      //added by andy
        if(!nextFile.renameTo(nextFile)){
         logger.info("zhh:The file is writing,try again:"+ nextFile);
         return Optional.absent();
        }
     //end

        int bufferSize = bufferMaxLines * bufferMaxLineLength;
        BufferedReader reader = new BufferedReader(new FileReader(nextFile),
            bufferSize);

I'm watching and testing it to check if I had avoid the problem.

Andy

2013/2/28 Roshan Naik <ro...@hortonworks.com>

> The spool dir source is designed with the expectation that you will move
> the completely written files into the spooling directory location. so let
> your windows app write the files and once they are done.. move it to
> another dir for consumption by flume.
>
>
> On Wed, Feb 27, 2013 at 8:42 AM, Jeff Lord <jl...@cloudera.com> wrote:
>
>> Have you considered using the move command instead of copy?
>>
>>
>> On Tue, Feb 26, 2013 at 10:49 PM, 周梦想 <ab...@gmail.com> wrote:
>>
>>> Hello,
>>> I have a question using spooldir source.
>>>
>>> If I have a large file such as more than 100MB, when I copy this file to
>>> spooldir, the flume agent will find it immediately and begin send data to
>>> another agent. If flume reading speed is faster than OS writing speed, it
>>> will reach end of file and begin change file name,but OS still write data
>>> to it. How to process this condition?
>>>
>>> I encountered this situation in windows. a thread change file to .fin,
>>> report permission error. another thread change file name ok. and then this
>>> this thread compain "java.io.IOException: Stream closed" again and again.
>>>
>>> What should I do?
>>>
>>> below is the error log from windows.
>>> Thanks,
>>> Andy
>>>
>>> 27 Feb 2013 12:00:05,310 INFO  [pool-5-thread-1]
>>> (org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile:229)
>>>  - Preparing to move file
>>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
>>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin
>>> 27 Feb 2013 12:00:05,310 ERROR [pool-5-thread-1]
>>> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
>>>  - Uncaught exception in Runnable
>>> org.apache.flume.FlumeException: Unable to move
>>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
>>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin. This
>>> will likely cause duplicate events. Please verify that flume has sufficient
>>> permissions to perform these operations.
>>>  at
>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:282)
>>> at
>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185)
>>>  at
>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>  at
>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>>  at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
>>>  at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>> at java.lang.Thread.run(Thread.java:619)
>>> 27 Feb 2013 12:00:05,825 ERROR [pool-5-thread-1]
>>> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
>>>  - Uncaught exception in Runnable
>>> java.io.IOException: Stream closed
>>>  at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
>>> at java.io.BufferedReader.readLine(BufferedReader.java:292)
>>>  at java.io.BufferedReader.readLine(BufferedReader.java:362)
>>> at
>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180)
>>>  at
>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>  at
>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>>  at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>>
>>>
>>
>

Re: windows spooldir source problem

Posted by Roshan Naik <ro...@hortonworks.com>.
The spool dir source is designed with the expectation that you will move
the completely written files into the spooling directory location. so let
your windows app write the files and once they are done.. move it to
another dir for consumption by flume.


On Wed, Feb 27, 2013 at 8:42 AM, Jeff Lord <jl...@cloudera.com> wrote:

> Have you considered using the move command instead of copy?
>
>
> On Tue, Feb 26, 2013 at 10:49 PM, 周梦想 <ab...@gmail.com> wrote:
>
>> Hello,
>> I have a question using spooldir source.
>>
>> If I have a large file such as more than 100MB, when I copy this file to
>> spooldir, the flume agent will find it immediately and begin send data to
>> another agent. If flume reading speed is faster than OS writing speed, it
>> will reach end of file and begin change file name,but OS still write data
>> to it. How to process this condition?
>>
>> I encountered this situation in windows. a thread change file to .fin,
>> report permission error. another thread change file name ok. and then this
>> this thread compain "java.io.IOException: Stream closed" again and again.
>>
>> What should I do?
>>
>> below is the error log from windows.
>> Thanks,
>> Andy
>>
>> 27 Feb 2013 12:00:05,310 INFO  [pool-5-thread-1]
>> (org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile:229)
>>  - Preparing to move file
>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin
>> 27 Feb 2013 12:00:05,310 ERROR [pool-5-thread-1]
>> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
>>  - Uncaught exception in Runnable
>> org.apache.flume.FlumeException: Unable to move
>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
>> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin. This
>> will likely cause duplicate events. Please verify that flume has sufficient
>> permissions to perform these operations.
>>  at
>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:282)
>> at
>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185)
>>  at
>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>  at
>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>  at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
>>  at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:619)
>> 27 Feb 2013 12:00:05,825 ERROR [pool-5-thread-1]
>> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
>>  - Uncaught exception in Runnable
>> java.io.IOException: Stream closed
>>  at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
>> at java.io.BufferedReader.readLine(BufferedReader.java:292)
>>  at java.io.BufferedReader.readLine(BufferedReader.java:362)
>> at
>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180)
>>  at
>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>  at
>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>  at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>
>>
>

Re: windows spooldir source problem

Posted by Jeff Lord <jl...@cloudera.com>.
Have you considered using the move command instead of copy?


On Tue, Feb 26, 2013 at 10:49 PM, 周梦想 <ab...@gmail.com> wrote:

> Hello,
> I have a question using spooldir source.
>
> If I have a large file such as more than 100MB, when I copy this file to
> spooldir, the flume agent will find it immediately and begin send data to
> another agent. If flume reading speed is faster than OS writing speed, it
> will reach end of file and begin change file name,but OS still write data
> to it. How to process this condition?
>
> I encountered this situation in windows. a thread change file to .fin,
> report permission error. another thread change file name ok. and then this
> this thread compain "java.io.IOException: Stream closed" again and again.
>
> What should I do?
>
> below is the error log from windows.
> Thanks,
> Andy
>
> 27 Feb 2013 12:00:05,310 INFO  [pool-5-thread-1]
> (org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile:229)
>  - Preparing to move file
> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin
> 27 Feb 2013 12:00:05,310 ERROR [pool-5-thread-1]
> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
>  - Uncaught exception in Runnable
> org.apache.flume.FlumeException: Unable to move
> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log to
> D:\TKServer\HandResult\_BakLog\201302271200handresult_hllord.log.fin. This
> will likely cause duplicate events. Please verify that flume has sufficient
> permissions to perform these operations.
>  at
> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:282)
> at
> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185)
>  at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>  at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)
> 27 Feb 2013 12:00:05,825 ERROR [pool-5-thread-1]
> (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:148)
>  - Uncaught exception in Runnable
> java.io.IOException: Stream closed
>  at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
> at java.io.BufferedReader.readLine(BufferedReader.java:292)
>  at java.io.BufferedReader.readLine(BufferedReader.java:362)
> at
> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180)
>  at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>  at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>
>