You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/08/09 01:15:33 UTC

flume git commit: FLUME-2619. Spooldir source should log channel exceptions

Repository: flume
Updated Branches:
  refs/heads/trunk 4b74aa286 -> 1422f7330


FLUME-2619. Spooldir source should log channel exceptions

Reviewed by Denes Arvay and Mike Percy.

(Bessenyei Bal�zs Don�t via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1422f733
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1422f733
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1422f733

Branch: refs/heads/trunk
Commit: 1422f733007dbb78caae7e5135bc33470e88502a
Parents: 4b74aa2
Author: Bessenyei Bal�zs Don�t <be...@cloudera.com>
Authored: Mon Aug 8 18:09:44 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Aug 8 18:11:58 2016 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/ChannelProcessor.java  | 24 +++++-----
 .../flume/source/SpoolDirectorySource.java      | 45 +++++++++++++------
 .../flume/source/TestSpoolDirectorySource.java  | 46 +++++++++++---------
 3 files changed, 71 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index 1cce137..6987860 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * A channel processor exposes operations to put {@link Event}s into
  * {@link Channel}s. These operations will propagate a {@link ChannelException}
  * if any errors occur while attempting to write to {@code required} channels.
- *
+ * <p>
  * Each channel processor instance is configured with a {@link ChannelSelector}
  * instance that specifies which channels are
  * {@linkplain ChannelSelector#getRequiredChannels(Event) required} and which
@@ -73,6 +74,7 @@ public class ChannelProcessor implements Configurable {
 
   /**
    * The Context of the associated Source is passed.
+   *
    * @param context
    */
   @Override
@@ -103,7 +105,7 @@ public class ChannelProcessor implements Configurable {
       if (type == null) {
         LOG.error("Type not specified for interceptor " + interceptorName);
         throw new FlumeException("Interceptor.Type not specified for " +
-          interceptorName);
+            interceptorName);
       }
       try {
         Interceptor.Builder builder = factory.newInstance(type);
@@ -132,7 +134,7 @@ public class ChannelProcessor implements Configurable {
    * Attempts to {@linkplain Channel#put(Event) put} the given events into each
    * configured channel. If any {@code required} channel throws a
    * {@link ChannelException}, that exception will be propagated.
-   *
+   * <p>
    * <p>Note that if multiple channels are configured, some {@link Transaction}s
    * may have already been committed while others may be rolled back in the
    * case of an exception.
@@ -165,7 +167,7 @@ public class ChannelProcessor implements Configurable {
 
       List<Channel> optChannels = selector.getOptionalChannels(event);
 
-      for (Channel ch: optChannels) {
+      for (Channel ch : optChannels) {
         List<Event> eventQueue = optChannelQueue.get(ch);
         if (eventQueue == null) {
           eventQueue = new ArrayList<Event>();
@@ -193,9 +195,10 @@ public class ChannelProcessor implements Configurable {
       } catch (Throwable t) {
         tx.rollback();
         if (t instanceof Error) {
-          LOG.error("Error while writing to required channel: " +
-              reqChannel, t);
+          LOG.error("Error while writing to required channel: " + reqChannel, t);
           throw (Error) t;
+        } else if (t instanceof ChannelException) {
+          throw (ChannelException) t;
         } else {
           throw new ChannelException("Unable to put batch on required " +
               "channel: " + reqChannel, t);
@@ -216,7 +219,7 @@ public class ChannelProcessor implements Configurable {
 
         List<Event> batch = optChannelQueue.get(optChannel);
 
-        for (Event event : batch ) {
+        for (Event event : batch) {
           optChannel.put(event);
         }
 
@@ -239,7 +242,7 @@ public class ChannelProcessor implements Configurable {
    * Attempts to {@linkplain Channel#put(Event) put} the given event into each
    * configured channel. If any {@code required} channel throws a
    * {@link ChannelException}, that exception will be propagated.
-   *
+   * <p>
    * <p>Note that if multiple channels are configured, some {@link Transaction}s
    * may have already been committed while others may be rolled back in the
    * case of an exception.
@@ -268,9 +271,10 @@ public class ChannelProcessor implements Configurable {
       } catch (Throwable t) {
         tx.rollback();
         if (t instanceof Error) {
-          LOG.error("Error while writing to required channel: " +
-              reqChannel, t);
+          LOG.error("Error while writing to required channel: " + reqChannel, t);
           throw (Error) t;
+        } else if (t instanceof ChannelException) {
+          throw (ChannelException) t;
         } else {
           throw new ChannelException("Unable to put event on required " +
               "channel: " + reqChannel, t);

http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index d88cc1d..c8c7cda 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
@@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*;
 
 public class SpoolDirectorySource extends AbstractSource
-                                  implements Configurable, EventDrivenSource {
+    implements Configurable, EventDrivenSource {
 
   private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class);
 
@@ -70,6 +71,7 @@ public class SpoolDirectorySource extends AbstractSource
   private ScheduledExecutorService executor;
   private boolean backoff = true;
   private boolean hitChannelException = false;
+  private boolean hitChannelFullException = false;
   private int maxBackoff;
   private ConsumeOrder consumeOrder;
   private int pollDelay;
@@ -158,7 +160,7 @@ public class SpoolDirectorySource extends AbstractSource
     inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
     decodeErrorPolicy = DecodeErrorPolicy.valueOf(
         context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY)
-        .toUpperCase(Locale.ENGLISH));
+            .toUpperCase(Locale.ENGLISH));
 
     ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
     trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
@@ -196,10 +198,10 @@ public class SpoolDirectorySource extends AbstractSource
   }
 
 
-
   /**
    * The class always backs off, this exists only so that we can test without
    * taking a really long time.
+   *
    * @param backoff - whether the source should backoff if the channel is full
    */
   @VisibleForTesting
@@ -208,11 +210,16 @@ public class SpoolDirectorySource extends AbstractSource
   }
 
   @VisibleForTesting
-  protected boolean hitChannelException() {
+  protected boolean didHitChannelException() {
     return hitChannelException;
   }
 
   @VisibleForTesting
+  protected boolean didHitChannelFullException() {
+    return hitChannelFullException;
+  }
+
+  @VisibleForTesting
   protected SourceCounter getSourceCounter() {
     return sourceCounter;
   }
@@ -227,7 +234,7 @@ public class SpoolDirectorySource extends AbstractSource
     private SourceCounter sourceCounter;
 
     public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
-        SourceCounter sourceCounter) {
+                                  SourceCounter sourceCounter) {
       this.reader = reader;
       this.sourceCounter = sourceCounter;
     }
@@ -247,17 +254,19 @@ public class SpoolDirectorySource extends AbstractSource
           try {
             getChannelProcessor().processEventBatch(events);
             reader.commit();
-          } catch (ChannelException ex) {
+          } catch (ChannelFullException ex) {
             logger.warn("The channel is full, and cannot write data now. The " +
-                "source will try again after " + String.valueOf(backoffInterval) +
+                "source will try again after " + backoffInterval +
+                " milliseconds");
+            hitChannelFullException = true;
+            backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
+            continue;
+          } catch (ChannelException ex) {
+            logger.warn("The channel threw an exception, and cannot write data now. The " +
+                "source will try again after " + backoffInterval +
                 " milliseconds");
             hitChannelException = true;
-            if (backoff) {
-              TimeUnit.MILLISECONDS.sleep(backoffInterval);
-              backoffInterval = backoffInterval << 1;
-              backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
-                                backoffInterval;
-            }
+            backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
             continue;
           }
           backoffInterval = 250;
@@ -272,5 +281,15 @@ public class SpoolDirectorySource extends AbstractSource
         Throwables.propagate(t);
       }
     }
+
+    private int waitAndGetNewBackoffInterval(int backoffInterval) throws InterruptedException {
+      if (backoff) {
+        TimeUnit.MILLISECONDS.sleep(backoffInterval);
+        backoffInterval = backoffInterval << 1;
+        backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
+            backoffInterval;
+      }
+      return backoffInterval;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 82c5351..0182d21 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -74,6 +74,7 @@ public class TestSpoolDirectorySource {
 
   /**
    * Helper method to recursively clean up testing directory
+   *
    * @param directory the directory to clean up
    */
   private void deleteFiles(File directory) {
@@ -87,7 +88,7 @@ public class TestSpoolDirectorySource {
     }
   }
 
-  @Test (expected = IllegalArgumentException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testInvalidSortOrder() {
     Context context = new Context();
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
@@ -102,7 +103,7 @@ public class TestSpoolDirectorySource {
     Context context = new Context();
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
         tmpDir.getAbsolutePath());
-    context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, 
+    context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
         "oLdESt");
     Configurables.configure(source, context);
     context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
@@ -110,17 +111,17 @@ public class TestSpoolDirectorySource {
     Configurables.configure(source, context);
     context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
         "rAnDom");
-    Configurables.configure(source, context);    
+    Configurables.configure(source, context);
   }
-  
+
   @Test
   public void testPutFilenameHeader() throws IOException, InterruptedException {
     Context context = new Context();
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
-                "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
-                f1, Charsets.UTF_8);
+            "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+        f1, Charsets.UTF_8);
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
         tmpDir.getAbsolutePath());
@@ -155,7 +156,7 @@ public class TestSpoolDirectorySource {
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
-        "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+            "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
         f1, Charsets.UTF_8);
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
@@ -192,18 +193,18 @@ public class TestSpoolDirectorySource {
     Assert.assertTrue("source directories must be created", directoriesCreated);
 
     final String FILE_NAME = "recursion_file.txt";
-    File f1 = new File(subDir,  FILE_NAME);
+    File f1 = new File(subDir, FILE_NAME);
     String origBody = "file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
         "file1line5\nfile1line6\nfile1line7\nfile1line8\n";
     Files.write(origBody, f1, Charsets.UTF_8);
 
     Context context = new Context();
     context.put(SpoolDirectorySourceConfigurationConstants.RECURSIVE_DIRECTORY_SEARCH,
-            "true"); // enable recursion, so we should find the file we created above
+        "true"); // enable recursion, so we should find the file we created above
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
-            tmpDir.getAbsolutePath()); // spool set to root dir
+        tmpDir.getAbsolutePath()); // spool set to root dir
     context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER,
-            "true"); // put the file name in the "file" header
+        "true"); // put the file name in the "file" header
 
     Configurables.configure(source, context);
     source.start();
@@ -224,7 +225,7 @@ public class TestSpoolDirectorySource {
 
     Assert.assertNotNull("Event headers must not be null", e.getHeaders());
     Assert.assertTrue("File header value did not end with expected filename",
-            e.getHeaders().get("file").endsWith(FILE_NAME));
+        e.getHeaders().get("file").endsWith(FILE_NAME));
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     do { // collecting the whole body
@@ -256,7 +257,6 @@ public class TestSpoolDirectorySource {
     Assert.assertTrue("source directories must be created", directoriesCreated);
 
 
-
     File f1 = new File(subDir.getAbsolutePath() + "/file1.txt");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
@@ -364,12 +364,12 @@ public class TestSpoolDirectorySource {
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
 
     Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
-        "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+            "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
         f1, Charsets.UTF_8);
 
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
-                tmpDir.getAbsolutePath());
+        tmpDir.getAbsolutePath());
 
     context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2");
     Configurables.configure(source, context);
@@ -377,10 +377,16 @@ public class TestSpoolDirectorySource {
     source.start();
 
     // Wait for the source to read enough events to fill up the channel.
-    while (!source.hitChannelException()) {
-      Thread.sleep(50);
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < 5000 && !source.didHitChannelFullException()) {
+      Thread.sleep(10);
     }
 
+    Assert.assertTrue("Expected to hit ChannelFullException, but did not!",
+        source.didHitChannelFullException());
+
+
     List<String> dataOut = Lists.newArrayList();
 
     for (int i = 0; i < 8; ) {
@@ -399,8 +405,6 @@ public class TestSpoolDirectorySource {
       tx.commit();
       tx.close();
     }
-    Assert.assertTrue("Expected to hit ChannelException, but did not!",
-                      source.hitChannelException());
     Assert.assertEquals(8, dataOut.size());
     source.stop();
   }
@@ -422,7 +426,7 @@ public class TestSpoolDirectorySource {
     Files.touch(f4);
 
     context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
-                tmpDir.getAbsolutePath());
+        tmpDir.getAbsolutePath());
     Configurables.configure(source, context);
     source.start();
 
@@ -431,7 +435,7 @@ public class TestSpoolDirectorySource {
 
     Assert.assertFalse("Server did not error", source.hasFatalError());
     Assert.assertEquals("One message was read",
-                        1, source.getSourceCounter().getEventAcceptedCount());
+        1, source.getSourceCounter().getEventAcceptedCount());
     source.stop();
   }
 }