You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Muhammad Ehsan ul Haque (JIRA)" <ji...@apache.org> on 2014/04/10 14:18:14 UTC

[jira] [Updated] (FLUME-2318) SpoolingDirectory is unable to handle empty files

     [ https://issues.apache.org/jira/browse/FLUME-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Muhammad Ehsan ul Haque updated FLUME-2318:
-------------------------------------------

    Attachment: FLUME-2318-2.patch

Fixed the bug for throwing exception when an empty file is the last file consumed by the spooling directory source. Also added an option to consume empty file as event. Updated the user guide accordingly.

> SpoolingDirectory is unable to handle empty files
> -------------------------------------------------
>
>                 Key: FLUME-2318
>                 URL: https://issues.apache.org/jira/browse/FLUME-2318
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>            Reporter: Muhammad Ehsan ul Haque
>            Priority: Minor
>              Labels: easytest, patch
>             Fix For: v1.4.0
>
>         Attachments: FLUME-2318-0.patch, FLUME-2318-1.patch, FLUME-2318-2.patch
>
>
> Empty files should be returned as an empty event instead of no event.
> h4. Scenario
> From the start consume files in this order
> # f1: File with data or empty file
> # f2: Empty File
> # No file in spooling directory
> h4. Expected Outcome
> # channel.take() should return event with f1 data.
> # channel.take() should return event with f2 data (empty data).
> # channel.take() should return null.
> h4. What happens
> # channel.take() returns event with f1 data.
> # channel.take() returns null.
> # Exception is raised when the SpoolDirectorySource thread tries to read events from the ReliableSpoolingFileEventReader. Snippet of trace is
> 2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file1 to /tmp/1391957195572-0/file1.COMPLETED
> 2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)] Last read was never committed - resetting mark position.
> 2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] Preparing to move file /tmp/1391957195572-0/file2 to /tmp/1391957195572-0/file2.COMPLETED
> 2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)] FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
> java.lang.IllegalStateException: File should not roll when commit is outstanding.
> 	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225)
> 	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:722)
> h4. Unit Test
> In TestSpoolDirectorySource
> {code}
>   @Test
>   public void testWithEmptyFile2()
>       throws InterruptedException, IOException {
>     Context context = new Context();
>     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
>     Files.write("some data".getBytes(), f1);
>     File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
>     Files.write(new byte[0], f2);
>     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
>         tmpDir.getAbsolutePath());
>     Configurables.configure(source, context);
>     source.start();
>     Thread.sleep(10);
>     for (int i=0; i<2; i++) {
>       Transaction txn = channel.getTransaction();
>       txn.begin();
>       Event e = channel.take();
>       txn.commit();
>       txn.close();
>     }
>     Transaction txn = channel.getTransaction();
>     txn.begin();
>     Assert.assertNull(channel.take());
>     txn.commit();
>     txn.close();
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)