You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2013/11/07 23:54:36 UTC
git commit: FLUME-2235. idleFuture should be cancelled at the start
of append
Updated Branches:
refs/heads/trunk e27ae5fdc -> 705abaf00
FLUME-2235. idleFuture should be cancelled at the start of append
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/705abaf0
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/705abaf0
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/705abaf0
Branch: refs/heads/trunk
Commit: 705abaf00fbf8ee69ac88cbccae47c1a33f4b4b2
Parents: e27ae5f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Nov 7 14:53:04 2013 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Nov 7 14:53:04 2013 -0800
----------------------------------------------------------------------
.../apache/flume/sink/hdfs/BucketWriter.java | 21 ++++++++++++++++++++
1 file changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/705abaf0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 65f4d2c..200d457 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -375,6 +375,27 @@ class BucketWriter {
public synchronized void append(final Event event)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
+ // If idleFuture is not null, cancel it before we move forward to avoid a
+ // close call in the middle of the append.
+ if(idleFuture != null) {
+ idleFuture.cancel(false);
+ // There is still a small race condition - if the idleFuture is already
+ // running, interrupting it can cause HDFS close operation to throw -
+ // so we cannot interrupt it while running. If the future could not be
+ // cancelled, it is already running - wait for it to finish before
+ // attempting to write.
+ if(!idleFuture.isDone()) {
+ try {
+ idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException ex) {
+ LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
+ " file close may have failed", ex);
+ } catch (Exception ex) {
+ LOG.warn("Error while trying to cancel closing of idle file. ", ex);
+ }
+ }
+ idleFuture = null;
+ }
if (!isOpen) {
if(idleClosed) {
throw new IOException("This bucket writer was closed due to idling and this handle " +