You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2013/11/07 23:54:36 UTC

git commit: FLUME-2235. idleFuture should be cancelled at the start of append

Updated Branches:
  refs/heads/trunk e27ae5fdc -> 705abaf00


FLUME-2235. idleFuture should be cancelled at the start of append

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/705abaf0
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/705abaf0
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/705abaf0

Branch: refs/heads/trunk
Commit: 705abaf00fbf8ee69ac88cbccae47c1a33f4b4b2
Parents: e27ae5f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Nov 7 14:53:04 2013 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Nov 7 14:53:04 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    | 21 ++++++++++++++++++++
 1 file changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/705abaf0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 65f4d2c..200d457 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -375,6 +375,27 @@ class BucketWriter {
   public synchronized void append(final Event event)
           throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
+    // If idleFuture is not null, cancel it before we move forward to avoid a
+    // close call in the middle of the append.
+    if(idleFuture != null) {
+      idleFuture.cancel(false);
+      // There is still a small race condition - if the idleFuture is already
+      // running, interrupting it can cause HDFS close operation to throw -
+      // so we cannot interrupt it while running. If the future could not be
+      // cancelled, it is already running - wait for it to finish before
+      // attempting to write.
+      if(!idleFuture.isDone()) {
+        try {
+          idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException ex) {
+          LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
+            " file close may have failed", ex);
+        } catch (Exception ex) {
+          LOG.warn("Error while trying to cancel closing of idle file. ", ex);
+        }
+      }
+      idleFuture = null;
+    }
     if (!isOpen) {
       if(idleClosed) {
         throw new IOException("This bucket writer was closed due to idling and this handle " +