You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/08/09 01:15:33 UTC
flume git commit: FLUME-2619. Spooldir source should log channel
exceptions
Repository: flume
Updated Branches:
refs/heads/trunk 4b74aa286 -> 1422f7330
FLUME-2619. Spooldir source should log channel exceptions
Reviewed by Denes Arvay and Mike Percy.
(Bessenyei Bal�zs Don�t via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1422f733
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1422f733
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1422f733
Branch: refs/heads/trunk
Commit: 1422f733007dbb78caae7e5135bc33470e88502a
Parents: 4b74aa2
Author: Bessenyei Bal�zs Don�t <be...@cloudera.com>
Authored: Mon Aug 8 18:09:44 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Aug 8 18:11:58 2016 -0700
----------------------------------------------------------------------
.../apache/flume/channel/ChannelProcessor.java | 24 +++++-----
.../flume/source/SpoolDirectorySource.java | 45 +++++++++++++------
.../flume/source/TestSpoolDirectorySource.java | 46 +++++++++++---------
3 files changed, 71 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index 1cce137..6987860 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
* A channel processor exposes operations to put {@link Event}s into
* {@link Channel}s. These operations will propagate a {@link ChannelException}
* if any errors occur while attempting to write to {@code required} channels.
- *
+ * <p>
* Each channel processor instance is configured with a {@link ChannelSelector}
* instance that specifies which channels are
* {@linkplain ChannelSelector#getRequiredChannels(Event) required} and which
@@ -73,6 +74,7 @@ public class ChannelProcessor implements Configurable {
/**
* The Context of the associated Source is passed.
+ *
* @param context
*/
@Override
@@ -103,7 +105,7 @@ public class ChannelProcessor implements Configurable {
if (type == null) {
LOG.error("Type not specified for interceptor " + interceptorName);
throw new FlumeException("Interceptor.Type not specified for " +
- interceptorName);
+ interceptorName);
}
try {
Interceptor.Builder builder = factory.newInstance(type);
@@ -132,7 +134,7 @@ public class ChannelProcessor implements Configurable {
* Attempts to {@linkplain Channel#put(Event) put} the given events into each
* configured channel. If any {@code required} channel throws a
* {@link ChannelException}, that exception will be propagated.
- *
+ * <p>
* <p>Note that if multiple channels are configured, some {@link Transaction}s
* may have already been committed while others may be rolled back in the
* case of an exception.
@@ -165,7 +167,7 @@ public class ChannelProcessor implements Configurable {
List<Channel> optChannels = selector.getOptionalChannels(event);
- for (Channel ch: optChannels) {
+ for (Channel ch : optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
@@ -193,9 +195,10 @@ public class ChannelProcessor implements Configurable {
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
- LOG.error("Error while writing to required channel: " +
- reqChannel, t);
+ LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
+ } else if (t instanceof ChannelException) {
+ throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
@@ -216,7 +219,7 @@ public class ChannelProcessor implements Configurable {
List<Event> batch = optChannelQueue.get(optChannel);
- for (Event event : batch ) {
+ for (Event event : batch) {
optChannel.put(event);
}
@@ -239,7 +242,7 @@ public class ChannelProcessor implements Configurable {
* Attempts to {@linkplain Channel#put(Event) put} the given event into each
* configured channel. If any {@code required} channel throws a
* {@link ChannelException}, that exception will be propagated.
- *
+ * <p>
* <p>Note that if multiple channels are configured, some {@link Transaction}s
* may have already been committed while others may be rolled back in the
* case of an exception.
@@ -268,9 +271,10 @@ public class ChannelProcessor implements Configurable {
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
- LOG.error("Error while writing to required channel: " +
- reqChannel, t);
+ LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
+ } else if (t instanceof ChannelException) {
+ throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put event on required " +
"channel: " + reqChannel, t);
http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index d88cc1d..c8c7cda 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
@@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*;
public class SpoolDirectorySource extends AbstractSource
- implements Configurable, EventDrivenSource {
+ implements Configurable, EventDrivenSource {
private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class);
@@ -70,6 +71,7 @@ public class SpoolDirectorySource extends AbstractSource
private ScheduledExecutorService executor;
private boolean backoff = true;
private boolean hitChannelException = false;
+ private boolean hitChannelFullException = false;
private int maxBackoff;
private ConsumeOrder consumeOrder;
private int pollDelay;
@@ -158,7 +160,7 @@ public class SpoolDirectorySource extends AbstractSource
inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
decodeErrorPolicy = DecodeErrorPolicy.valueOf(
context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY)
- .toUpperCase(Locale.ENGLISH));
+ .toUpperCase(Locale.ENGLISH));
ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
@@ -196,10 +198,10 @@ public class SpoolDirectorySource extends AbstractSource
}
-
/**
* The class always backs off, this exists only so that we can test without
* taking a really long time.
+ *
* @param backoff - whether the source should backoff if the channel is full
*/
@VisibleForTesting
@@ -208,11 +210,16 @@ public class SpoolDirectorySource extends AbstractSource
}
@VisibleForTesting
- protected boolean hitChannelException() {
+ protected boolean didHitChannelException() {
return hitChannelException;
}
@VisibleForTesting
+ protected boolean didHitChannelFullException() {
+ return hitChannelFullException;
+ }
+
+ @VisibleForTesting
protected SourceCounter getSourceCounter() {
return sourceCounter;
}
@@ -227,7 +234,7 @@ public class SpoolDirectorySource extends AbstractSource
private SourceCounter sourceCounter;
public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
- SourceCounter sourceCounter) {
+ SourceCounter sourceCounter) {
this.reader = reader;
this.sourceCounter = sourceCounter;
}
@@ -247,17 +254,19 @@ public class SpoolDirectorySource extends AbstractSource
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
- } catch (ChannelException ex) {
+ } catch (ChannelFullException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
- "source will try again after " + String.valueOf(backoffInterval) +
+ "source will try again after " + backoffInterval +
+ " milliseconds");
+ hitChannelFullException = true;
+ backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
+ continue;
+ } catch (ChannelException ex) {
+ logger.warn("The channel threw an exception, and cannot write data now. The " +
+ "source will try again after " + backoffInterval +
" milliseconds");
hitChannelException = true;
- if (backoff) {
- TimeUnit.MILLISECONDS.sleep(backoffInterval);
- backoffInterval = backoffInterval << 1;
- backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
- backoffInterval;
- }
+ backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
continue;
}
backoffInterval = 250;
@@ -272,5 +281,15 @@ public class SpoolDirectorySource extends AbstractSource
Throwables.propagate(t);
}
}
+
+ private int waitAndGetNewBackoffInterval(int backoffInterval) throws InterruptedException {
+ if (backoff) {
+ TimeUnit.MILLISECONDS.sleep(backoffInterval);
+ backoffInterval = backoffInterval << 1;
+ backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
+ backoffInterval;
+ }
+ return backoffInterval;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 82c5351..0182d21 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -74,6 +74,7 @@ public class TestSpoolDirectorySource {
/**
* Helper method to recursively clean up testing directory
+ *
* @param directory the directory to clean up
*/
private void deleteFiles(File directory) {
@@ -87,7 +88,7 @@ public class TestSpoolDirectorySource {
}
}
- @Test (expected = IllegalArgumentException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testInvalidSortOrder() {
Context context = new Context();
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
@@ -102,7 +103,7 @@ public class TestSpoolDirectorySource {
Context context = new Context();
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
tmpDir.getAbsolutePath());
- context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
+ context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
"oLdESt");
Configurables.configure(source, context);
context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
@@ -110,17 +111,17 @@ public class TestSpoolDirectorySource {
Configurables.configure(source, context);
context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER,
"rAnDom");
- Configurables.configure(source, context);
+ Configurables.configure(source, context);
}
-
+
@Test
public void testPutFilenameHeader() throws IOException, InterruptedException {
Context context = new Context();
File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
- "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
- f1, Charsets.UTF_8);
+ "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+ f1, Charsets.UTF_8);
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
tmpDir.getAbsolutePath());
@@ -155,7 +156,7 @@ public class TestSpoolDirectorySource {
File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
- "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+ "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
f1, Charsets.UTF_8);
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
@@ -192,18 +193,18 @@ public class TestSpoolDirectorySource {
Assert.assertTrue("source directories must be created", directoriesCreated);
final String FILE_NAME = "recursion_file.txt";
- File f1 = new File(subDir, FILE_NAME);
+ File f1 = new File(subDir, FILE_NAME);
String origBody = "file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
"file1line5\nfile1line6\nfile1line7\nfile1line8\n";
Files.write(origBody, f1, Charsets.UTF_8);
Context context = new Context();
context.put(SpoolDirectorySourceConfigurationConstants.RECURSIVE_DIRECTORY_SEARCH,
- "true"); // enable recursion, so we should find the file we created above
+ "true"); // enable recursion, so we should find the file we created above
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
- tmpDir.getAbsolutePath()); // spool set to root dir
+ tmpDir.getAbsolutePath()); // spool set to root dir
context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER,
- "true"); // put the file name in the "file" header
+ "true"); // put the file name in the "file" header
Configurables.configure(source, context);
source.start();
@@ -224,7 +225,7 @@ public class TestSpoolDirectorySource {
Assert.assertNotNull("Event headers must not be null", e.getHeaders());
Assert.assertTrue("File header value did not end with expected filename",
- e.getHeaders().get("file").endsWith(FILE_NAME));
+ e.getHeaders().get("file").endsWith(FILE_NAME));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
do { // collecting the whole body
@@ -256,7 +257,6 @@ public class TestSpoolDirectorySource {
Assert.assertTrue("source directories must be created", directoriesCreated);
-
File f1 = new File(subDir.getAbsolutePath() + "/file1.txt");
Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
@@ -364,12 +364,12 @@ public class TestSpoolDirectorySource {
File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
- "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+ "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
f1, Charsets.UTF_8);
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
- tmpDir.getAbsolutePath());
+ tmpDir.getAbsolutePath());
context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2");
Configurables.configure(source, context);
@@ -377,10 +377,16 @@ public class TestSpoolDirectorySource {
source.start();
// Wait for the source to read enough events to fill up the channel.
- while (!source.hitChannelException()) {
- Thread.sleep(50);
+
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < 5000 && !source.didHitChannelFullException()) {
+ Thread.sleep(10);
}
+ Assert.assertTrue("Expected to hit ChannelFullException, but did not!",
+ source.didHitChannelFullException());
+
+
List<String> dataOut = Lists.newArrayList();
for (int i = 0; i < 8; ) {
@@ -399,8 +405,6 @@ public class TestSpoolDirectorySource {
tx.commit();
tx.close();
}
- Assert.assertTrue("Expected to hit ChannelException, but did not!",
- source.hitChannelException());
Assert.assertEquals(8, dataOut.size());
source.stop();
}
@@ -422,7 +426,7 @@ public class TestSpoolDirectorySource {
Files.touch(f4);
context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
- tmpDir.getAbsolutePath());
+ tmpDir.getAbsolutePath());
Configurables.configure(source, context);
source.start();
@@ -431,7 +435,7 @@ public class TestSpoolDirectorySource {
Assert.assertFalse("Server did not error", source.hasFatalError());
Assert.assertEquals("One message was read",
- 1, source.getSourceCounter().getEventAcceptedCount());
+ 1, source.getSourceCounter().getEventAcceptedCount());
source.stop();
}
}