You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/03/01 00:06:59 UTC
git commit: FLUME-1925. HDFS timeouts should not starve other threads.
Updated Branches:
refs/heads/flume-1.4 bedfc4e9b -> 67d44cd84
FLUME-1925. HDFS timeouts should not starve other threads.
(Hari Shreedharan via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/67d44cd8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/67d44cd8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/67d44cd8
Branch: refs/heads/flume-1.4
Commit: 67d44cd84074ccfbc0ab295053852ea3acdc52dc
Parents: bedfc4e
Author: Mike Percy <mp...@apache.org>
Authored: Thu Feb 28 15:06:13 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Feb 28 15:06:49 2013 -0800
----------------------------------------------------------------------
.../org/apache/flume/sink/hdfs/BucketWriter.java | 145 +++++++++++----
.../org/apache/flume/sink/hdfs/HDFSEventSink.java | 99 +----------
.../apache/flume/sink/hdfs/TestBucketWriter.java | 16 +-
3 files changed, 126 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 0786857..cdc37f6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -21,9 +21,14 @@ package org.apache.flume.sink.hdfs;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Clock;
@@ -34,7 +39,6 @@ import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -93,6 +97,8 @@ class BucketWriter {
private volatile ScheduledFuture<Void> idleFuture;
private final WriterCallback onIdleCallback;
private final String onIdleCallbackPath;
+ private final long callTimeout;
+ private final ExecutorService callTimeoutPool;
private Clock clock = new SystemClock();
@@ -101,12 +107,13 @@ class BucketWriter {
protected boolean idleClosed = false;
BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
- Context context, String filePath, String fileName, String inUsePrefix,
- String inUseSuffix, String fileSuffix, CompressionCodec codeC,
- CompressionType compType, HDFSWriter writer,
- ScheduledExecutorService timedRollerPool, UserGroupInformation user,
- SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
- String onIdleCallbackPath) {
+ Context context, String filePath, String fileName, String inUsePrefix,
+ String inUseSuffix, String fileSuffix, CompressionCodec codeC,
+ CompressionType compType, HDFSWriter writer,
+ ScheduledExecutorService timedRollerPool, UserGroupInformation user,
+ SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
+ String onIdleCallbackPath, long callTimeout,
+ ExecutorService callTimeoutPool) {
this.rollInterval = rollInterval;
this.rollSize = rollSize;
this.rollCount = rollCount;
@@ -125,7 +132,8 @@ class BucketWriter {
this.idleTimeout = idleTimeout;
this.onIdleCallback = onIdleCallback;
this.onIdleCallbackPath = onIdleCallbackPath;
-
+ this.callTimeout = callTimeout;
+ this.callTimeoutPool = callTimeoutPool;
fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
isOpen = false;
@@ -193,7 +201,7 @@ class BucketWriter {
throw new IOException("Invalid file settings");
}
- Configuration config = new Configuration();
+ final Configuration config = new Configuration();
// disable FileSystem JVM shutdown hook
config.setBoolean("fs.automatic.close", false);
@@ -223,16 +231,22 @@ class BucketWriter {
targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
LOG.info("Creating " + bucketPath);
- if (codeC == null) {
- // Need to get reference to FS using above config before underlying
- // writer does in order to avoid shutdown hook & IllegalStateExceptions
- fileSystem = new Path(bucketPath).getFileSystem(config);
- writer.open(bucketPath);
- } else {
- // need to get reference to FS before writer does to avoid shutdown hook
- fileSystem = new Path(bucketPath).getFileSystem(config);
- writer.open(bucketPath, codeC, compType);
- }
+ callWithTimeout(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (codeC == null) {
+ // Need to get reference to FS using above config before underlying
+ // writer does in order to avoid shutdown hook & IllegalStateExceptions
+ fileSystem = new Path(bucketPath).getFileSystem(config);
+ writer.open(bucketPath);
+ } else {
+ // need to get reference to FS before writer does to avoid shutdown hook
+ fileSystem = new Path(bucketPath).getFileSystem(config);
+ writer.open(bucketPath, codeC, compType);
+ }
+ return null;
+ }
+ });
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
if (ex instanceof IOException) {
@@ -287,11 +301,17 @@ class BucketWriter {
* doClose() must only be called by close()
* @throws IOException
*/
- private void doClose() throws IOException {
+ private void doClose() throws IOException, InterruptedException {
LOG.debug("Closing {}", bucketPath);
if (isOpen) {
try {
- writer.close(); // could block
+ callWithTimeout(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ writer.close(); // could block
+ return null;
+ }
+ });
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
@@ -361,8 +381,14 @@ class BucketWriter {
* doFlush() must only be called by flush()
* @throws IOException
*/
- private void doFlush() throws IOException {
- writer.sync(); // could block
+ private void doFlush() throws IOException, InterruptedException {
+ callWithTimeout(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ writer.sync(); // could block
+ return null;
+ }
+ });
batchCounter = 0;
}
@@ -378,7 +404,7 @@ class BucketWriter {
* @throws IOException
* @throws InterruptedException
*/
- public synchronized void append(Event event)
+ public synchronized void append(final Event event)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
if (!isOpen) {
@@ -398,7 +424,13 @@ class BucketWriter {
// write the event
try {
sinkCounter.incrementEventDrainAttemptCount();
- writer.append(event); // could block
+ callWithTimeout(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ writer.append(event); // could block
+ return null;
+ }
+ });
} catch (IOException e) {
LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
bucketPath + ") and rethrowing exception.",
@@ -444,18 +476,24 @@ class BucketWriter {
/**
* Rename bucketPath file from .tmp to permanent location.
*/
- private void renameBucket() throws IOException {
+ private void renameBucket() throws IOException, InterruptedException {
if(bucketPath.equals(targetPath)) {
return;
}
- Path srcPath = new Path(bucketPath);
- Path dstPath = new Path(targetPath);
+ final Path srcPath = new Path(bucketPath);
+ final Path dstPath = new Path(targetPath);
- if(fileSystem.exists(srcPath)) { // could block
- LOG.info("Renaming " + srcPath + " to " + dstPath);
- fileSystem.rename(srcPath, dstPath); // could block
- }
+ callWithTimeout(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ if(fileSystem.exists(srcPath)) { // could block
+ LOG.info("Renaming " + srcPath + " to " + dstPath);
+ fileSystem.rename(srcPath, dstPath); // could block
+ }
+ return null;
+ }
+ });
}
@Override
@@ -485,4 +523,47 @@ class BucketWriter {
+ "taking too long.");
}
}
+
+ /**
+ * Execute the callable on a separate thread and wait for the completion
+ * for the specified amount of time in milliseconds. In case of timeout
+ * cancel the callable and throw an IOException
+ */
+ private <T> T callWithTimeout(Callable<T> callable)
+ throws IOException, InterruptedException {
+ Future<T> future = callTimeoutPool.submit(callable);
+ try {
+ if (callTimeout > 0) {
+ return future.get(callTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ return future.get();
+ }
+ } catch (TimeoutException eT) {
+ future.cancel(true);
+ sinkCounter.incrementConnectionFailedCount();
+ throw new IOException("Callable timed out after " + callTimeout + " ms",
+ eT);
+ } catch (ExecutionException e1) {
+ sinkCounter.incrementConnectionFailedCount();
+ Throwable cause = e1.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof InterruptedException) {
+ throw (InterruptedException) cause;
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof Error) {
+ throw (Error)cause;
+ } else {
+ throw new RuntimeException(e1);
+ }
+ } catch (CancellationException ce) {
+ throw new InterruptedException(
+ "Blocked callable interrupted by rotation event");
+ } catch (InterruptedException ex) {
+ LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
+ throw ex;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 76e3d1f..741ac90 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -26,15 +26,10 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.TimeZone;
import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
@@ -81,7 +76,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
private static final String defaultInUsePrefix = "";
private static final String defaultInUseSuffix = ".tmp";
private static final long defaultBatchSize = 100;
- private static final long defaultTxnEventMax = 100;
private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
private static final int defaultMaxOpenFiles = 5000;
@@ -336,47 +330,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
return codec;
}
- /**
- * Execute the callable on a separate thread and wait for the completion
- * for the specified amount of time in milliseconds. In case of timeout
- * cancel the callable and throw an IOException
- */
- private <T> T callWithTimeout(Callable<T> callable)
- throws IOException, InterruptedException {
- Future<T> future = callTimeoutPool.submit(callable);
- try {
- if (callTimeout > 0) {
- return future.get(callTimeout, TimeUnit.MILLISECONDS);
- } else {
- return future.get();
- }
- } catch (TimeoutException eT) {
- future.cancel(true);
- sinkCounter.incrementConnectionFailedCount();
- throw new IOException("Callable timed out after " + callTimeout + " ms",
- eT);
- } catch (ExecutionException e1) {
- sinkCounter.incrementConnectionFailedCount();
- Throwable cause = e1.getCause();
- if (cause instanceof IOException) {
- throw (IOException) cause;
- } else if (cause instanceof InterruptedException) {
- throw (InterruptedException) cause;
- } else if (cause instanceof RuntimeException) {
- throw (RuntimeException) cause;
- } else if (cause instanceof Error) {
- throw (Error)cause;
- } else {
- throw new RuntimeException(e1);
- }
- } catch (CancellationException ce) {
- throw new InterruptedException(
- "Blocked callable interrupted by rotation event");
- } catch (InterruptedException ex) {
- LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
- throw ex;
- }
- }
/**
* Pull events out of channel and send it to HDFS. Take at most batchSize
@@ -423,7 +376,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
- proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath);
+ proxyTicket, sinkCounter, idleTimeout, idleCallback,
+ lookupPath, callTimeout, callTimeoutPool);
sfWriters.put(lookupPath, bucketWriter);
}
@@ -434,7 +388,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
}
// Write the data to HDFS
- append(bucketWriter, event);
+ bucketWriter.append(event);
}
if (txnEventCount == 0) {
@@ -447,7 +401,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
- flush(bucketWriter);
+ bucketWriter.flush();
}
transaction.commit();
@@ -482,7 +436,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
LOG.info("Closing {}", entry.getKey());
try {
- close(entry.getValue());
+ entry.getValue().close();
} catch (Exception ex) {
LOG.warn("Exception while closing " + entry.getKey() + ". " +
"Exception follows.", ex);
@@ -727,49 +681,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
" }";
}
- /**
- * Append to bucket writer with timeout enforced
- */
- private void append(final BucketWriter bucketWriter, final Event event)
- throws IOException, InterruptedException {
-
- // Write the data to HDFS
- callWithTimeout(new Callable<Void>() {
- public Void call() throws Exception {
- bucketWriter.append(event);
- return null;
- }
- });
- }
-
- /**
- * Flush bucket writer with timeout enforced
- */
- private void flush(final BucketWriter bucketWriter)
- throws IOException, InterruptedException {
-
- callWithTimeout(new Callable<Void>() {
- public Void call() throws Exception {
- bucketWriter.flush();
- return null;
- }
- });
- }
-
- /**
- * Close bucket writer with timeout enforced
- */
- private void close(final BucketWriter bucketWriter)
- throws IOException, InterruptedException {
-
- callWithTimeout(new Callable<Void>() {
- public Void call() throws Exception {
- bucketWriter.close();
- return null;
- }
- });
- }
-
@VisibleForTesting
void setBucketClock(Clock clock) {
BucketPath.setClock(clock);
http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index ebe277c..99b6150 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -69,7 +69,7 @@ public class TestBucketWriter {
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
hdfsWriter, timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
- null, null);
+ null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -93,7 +93,7 @@ public class TestBucketWriter {
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
hdfsWriter, timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -119,7 +119,7 @@ public class TestBucketWriter {
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
hdfsWriter, timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
long startNanos = System.nanoTime();
@@ -205,7 +205,7 @@ public class TestBucketWriter {
path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -228,7 +228,7 @@ public class TestBucketWriter {
"/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
// Need to override system time use for test so we know what to expect
final long testTime = System.currentTimeMillis();
@@ -255,7 +255,7 @@ public class TestBucketWriter {
"/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
// Need to override system time use for test so we know what to expect
@@ -285,7 +285,7 @@ public class TestBucketWriter {
"/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
@@ -304,7 +304,7 @@ public class TestBucketWriter {
"/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
- 0, null, null);
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);