You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/10/04 14:25:09 UTC

flume git commit: FLUME-2973 Deadlock in hdfs sink

Repository: flume
Updated Branches:
  refs/heads/trunk e08ab04de -> 1b4378396


FLUME-2973 Deadlock in hdfs sink

This PR is based on Yan Jian's fix and his test improvements.
Also contains the deadlock reproduction contributed by @adenes.
I have made minimal changes to those contributions.
Denes's test was used for checking the fix.
Yan's fix contains an optimization as it first calls the callback function
that removes the BucketWriter from the cache.
This is useful, should help to avoid some errors.

This closes #226

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major, Yan Jian, Denes Arvay via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1b437839
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1b437839
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1b437839

Branch: refs/heads/trunk
Commit: 1b4378396a441629fa0332d4814e053345c58ffb
Parents: e08ab04
Author: Endre Major <em...@cloudera.com>
Authored: Thu Oct 4 16:16:41 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Thu Oct 4 16:16:41 2018 +0200

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    |  25 +-
 .../flume/sink/hdfs/TestBucketWriter.java       | 323 ++++++++++++++-----
 .../sink/hdfs/TestHDFSEventSinkDeadlock.java    | 127 ++++++++
 3 files changed, 394 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1b437839/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 62e5383..2462d8a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -112,7 +113,7 @@ class BucketWriter {
 
   // flag that the bucket writer was closed due to idling and thus shouldn't be
   // reopened. Not ideal, but avoids internals of owners
-  protected boolean closed = false;
+  protected AtomicBoolean closed = new AtomicBoolean();
   AtomicInteger renameTries = new AtomicInteger(0);
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
@@ -315,7 +316,7 @@ class BucketWriter {
    * @throws IOException On failure to rename if temp file exists.
    * @throws InterruptedException
    */
-  public synchronized void close() throws IOException, InterruptedException {
+  public void close() throws IOException, InterruptedException {
     close(false);
   }
 
@@ -381,7 +382,19 @@ class BucketWriter {
    * @throws IOException On failure to rename if temp file exists.
    * @throws InterruptedException
    */
-  public synchronized void close(boolean callCloseCallback)
+  public void close(boolean callCloseCallback)
+      throws IOException, InterruptedException {
+    if (callCloseCallback) {
+      if (closed.compareAndSet(false, true)) {
+        runCloseAction(); //remove from the cache as soon as possible
+      } else {
+        LOG.warn("This bucketWriter is already closing or closed.");
+      }
+    }
+    doClose();
+  }
+
+  private synchronized void doClose()
       throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     try {
@@ -431,10 +444,6 @@ class BucketWriter {
         timedRollerPool.schedule(scheduledRename, retryInterval, TimeUnit.SECONDS);
       }
     }
-    if (callCloseCallback) {
-      runCloseAction();
-      closed = true;
-    }
   }
 
   /**
@@ -534,7 +543,7 @@ class BucketWriter {
     // force a new bucket writer to be created. Roll count and roll size will
     // just reuse this one
     if (!isOpen) {
-      if (closed) {
+      if (closed.get()) {
         throw new BucketClosedException("This bucket writer was closed and " +
           "this handle is thus no longer valid");
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/1b437839/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 4221a5d..f96aa96 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -22,10 +22,12 @@ import com.google.common.base.Charsets;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.SystemClock;
 import org.apache.flume.auth.FlumeAuthenticationUtil;
 import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Calendar;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -73,11 +76,8 @@ public class TestBucketWriter {
   public void testEventCountingRoller() throws IOException, InterruptedException {
     int maxEvents = 100;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollCount(maxEvents).build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -97,11 +97,8 @@ public class TestBucketWriter {
   public void testSizeRoller() throws IOException, InterruptedException {
     int maxBytes = 300;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollSize(maxBytes).build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -124,16 +121,14 @@ public class TestBucketWriter {
     final AtomicBoolean calledBack = new AtomicBoolean(false);
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
-        new HDFSEventSink.WriterCallback() {
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setOnCloseCallback(new HDFSEventSink.WriterCallback() {
           @Override
           public void run(String filePath) {
             calledBack.set(true);
           }
-        }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
+        }).build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -144,14 +139,11 @@ public class TestBucketWriter {
     // sleep to force a roll... wait 2x interval just to be sure
     Thread.sleep(2 * ROLL_INTERVAL * 1000L);
 
-    Assert.assertTrue(bucketWriter.closed);
+    Assert.assertTrue(bucketWriter.closed.get());
     Assert.assertTrue(calledBack.get());
 
-    bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL).build();
     // write one more event (to reopen a new file so we will roll again later)
     bucketWriter.append(e);
 
@@ -222,17 +214,17 @@ public class TestBucketWriter {
       }
     };
 
-    HDFSTextSerializer serializer = new HDFSTextSerializer();
+
     File tmpFile = File.createTempFile("flume", "test");
     tmpFile.deleteOnExit();
     String path = tmpFile.getParent();
     String name = tmpFile.getName();
 
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, path, name, "", ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setFilePath(path)
+        .setFileName(name)
+        .build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -259,11 +251,11 @@ public class TestBucketWriter {
     };
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0, testClock);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setFileSuffix(suffix)
+        .setClock(testClock)
+        .build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -288,11 +280,11 @@ public class TestBucketWriter {
     };
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0, testClock);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setFileSuffix(suffix)
+        .setClock(testClock)
+        .build();
 
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
@@ -319,12 +311,14 @@ public class TestBucketWriter {
       }
     };
 
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix,
-        HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter,
-        timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0, testClock
-    );
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setFileSuffix(suffix)
+        .setCodeC(HDFSEventSink.getCodec("gzip"))
+        .setCompType(SequenceFile.CompressionType.BLOCK)
+        .setClock(testClock)
+        .build();
+
 
 
 
@@ -342,11 +336,10 @@ public class TestBucketWriter {
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextSerializer formatter = new HDFSTextSerializer();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setInUsePrefix(PREFIX)
+        .build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -361,11 +354,10 @@ public class TestBucketWriter {
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextSerializer serializer = new HDFSTextSerializer();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setRollInterval(ROLL_INTERVAL)
+        .setInUseSuffix(SUFFIX)
+        .build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -379,17 +371,17 @@ public class TestBucketWriter {
     final String SUFFIX = "WELCOME_TO_THE_EREBOR";
     final AtomicBoolean callbackCalled = new AtomicBoolean(false);
 
-    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
-        new HDFSEventSink.WriterCallback() {
+    BucketWriter bucketWriter = new BucketWriterBuilder()
+        .setRollInterval(ROLL_INTERVAL)
+        .setInUseSuffix(SUFFIX)
+        .setOnCloseCallback(new HDFSEventSink.WriterCallback() {
           @Override
           public void run(String filePath) {
             callbackCalled.set(true);
           }
-        }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
+        })
+        .setOnCloseCallbackPath("blah")
+        .build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -398,8 +390,6 @@ public class TestBucketWriter {
     Assert.assertTrue(callbackCalled.get());
   }
 
-
-
   @Test
   public void testSequenceFileRenameRetries() throws Exception {
     SequenceFileRenameRetryCoreTest(1, true);
@@ -429,11 +419,18 @@ public class TestBucketWriter {
     context.put("hdfs.retryInterval", "1");
     context.put("hdfs.callTimeout", Long.toString(1000));
     MockFileSystem mockFs = new MockFileSystem(fs, numberOfRetriesRequired, closeSucceed);
-    BucketWriter bucketWriter = new BucketWriter(
-        0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
-        null, new MockDataStream(mockFs), timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 1, numberOfRetriesRequired);
+    MockDataStream writer = new MockDataStream(mockFs);
+    BucketWriter bucketWriter = new BucketWriterBuilder(writer)
+        .setRollCount(1)
+        .setBatchSize(1)
+        .setFilePath(hdfsPath)
+        .setFileName(hdfsPath)
+        .setInUsePrefix("singleBucket")
+        .setCompType(null)
+        .setRetryInterval(1)
+        .setMaxCloseTries(numberOfRetriesRequired)
+        .setWriter(writer)
+        .build();
 
     bucketWriter.setFileSystem(mockFs);
     // At this point, we checked if isFileClosed is available in
@@ -459,11 +456,10 @@ public class TestBucketWriter {
         FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs("alice");
 
     final int ROLL_COUNT = 1; // Cause a roll after every successful append().
-    BucketWriter bucketWriter = new BucketWriter(
-        0, 0, ROLL_COUNT, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, ugiProxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
+    BucketWriter bucketWriter = new BucketWriterBuilder(hdfsWriter)
+        .setProxyUser(ugiProxy)
+        .setRollCount(ROLL_COUNT)
+        .build();
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
 
@@ -497,4 +493,185 @@ public class TestBucketWriter {
     Assert.assertEquals("events written", 1, hdfsWriter.getEventsWritten());
     Assert.assertEquals("2 files should be closed", 2, hdfsWriter.getFilesClosed());
   }
+
+  private class BucketWriterBuilder {
+    private long rollInterval = 0;
+    private long rollSize = 0;
+    private long rollCount = 0;
+    private long batchSize = 0;
+    private Context context = TestBucketWriter.this.ctx;
+    private String filePath = "/tmp";
+    private String fileName = "file";
+    private String inUsePrefix = "";
+    private String inUseSuffix = ".tmp";
+    private String fileSuffix = null;
+    private CompressionCodec codeC = null;
+    private CompressionType compType = SequenceFile.CompressionType.NONE;
+    private HDFSWriter writer = null;
+    private ScheduledExecutorService timedRollerPool = TestBucketWriter.timedRollerPool;
+    private PrivilegedExecutor proxyUser = TestBucketWriter.proxy;
+    private SinkCounter sinkCounter = new SinkCounter(
+        "test-bucket-writer-" + System.currentTimeMillis());
+    private int idleTimeout = 0;
+    private WriterCallback onCloseCallback = null;
+    private String onCloseCallbackPath = null;
+    private long callTimeout = 30000;
+    private ExecutorService callTimeoutPool = Executors.newSingleThreadExecutor();
+    private long retryInterval = 0;
+    private int maxCloseTries = 0;
+    private Clock clock = null;
+
+    public BucketWriterBuilder() {
+    }
+
+    public BucketWriterBuilder(HDFSWriter writer) {
+      this.writer = writer;
+    }
+
+    public BucketWriterBuilder setRollInterval(long rollInterval) {
+      this.rollInterval = rollInterval;
+      return this;
+    }
+
+    public BucketWriterBuilder setRollSize(long rollSize) {
+      this.rollSize = rollSize;
+      return this;
+    }
+
+    public BucketWriterBuilder setRollCount(long rollCount) {
+      this.rollCount = rollCount;
+      return this;
+    }
+
+    public BucketWriterBuilder setBatchSize(long batchSize) {
+      this.batchSize = batchSize;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setContext(Context context) {
+      this.context = context;
+      return this;
+    }
+
+    public BucketWriterBuilder setFilePath(String filePath) {
+      this.filePath = filePath;
+      return this;
+    }
+
+    public BucketWriterBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public BucketWriterBuilder setInUsePrefix(String inUsePrefix) {
+      this.inUsePrefix = inUsePrefix;
+      return this;
+    }
+
+    public BucketWriterBuilder setInUseSuffix(String inUseSuffix) {
+      this.inUseSuffix = inUseSuffix;
+      return this;
+    }
+
+    public BucketWriterBuilder setFileSuffix(String fileSuffix) {
+      this.fileSuffix = fileSuffix;
+      return this;
+    }
+
+    public BucketWriterBuilder setCodeC(CompressionCodec codeC) {
+      this.codeC = codeC;
+      return this;
+    }
+
+    public BucketWriterBuilder setCompType(CompressionType compType) {
+      this.compType = compType;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setTimedRollerPool(
+        ScheduledExecutorService timedRollerPool) {
+      this.timedRollerPool = timedRollerPool;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setProxyUser(PrivilegedExecutor proxyUser) {
+      this.proxyUser = proxyUser;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setSinkCounter(SinkCounter sinkCounter) {
+      this.sinkCounter = sinkCounter;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setIdleTimeout(int idleTimeout) {
+      this.idleTimeout = idleTimeout;
+      return this;
+    }
+
+    public BucketWriterBuilder setOnCloseCallback(
+        WriterCallback onCloseCallback) {
+      this.onCloseCallback = onCloseCallback;
+      return this;
+    }
+
+    public BucketWriterBuilder setOnCloseCallbackPath(
+        String onCloseCallbackPath) {
+      this.onCloseCallbackPath = onCloseCallbackPath;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setCallTimeout(long callTimeout) {
+      this.callTimeout = callTimeout;
+      return this;
+    }
+
+    @SuppressWarnings("unused")
+    public BucketWriterBuilder setCallTimeoutPool(
+        ExecutorService callTimeoutPool) {
+      this.callTimeoutPool = callTimeoutPool;
+      return this;
+    }
+
+    public BucketWriterBuilder setRetryInterval(long retryInterval) {
+      this.retryInterval = retryInterval;
+      return this;
+    }
+
+    public BucketWriterBuilder setMaxCloseTries(int maxCloseTries) {
+      this.maxCloseTries = maxCloseTries;
+      return this;
+    }
+
+    public BucketWriterBuilder setWriter(HDFSWriter writer) {
+      this.writer = writer;
+      return this;
+    }
+
+    public BucketWriterBuilder setClock(Clock clock) {
+      this.clock = clock;
+      return this;
+    }
+
+    public BucketWriter build() {
+      if (clock ==  null) {
+        clock = new SystemClock();
+      }
+      if (writer == null) {
+        writer = new MockHDFSWriter();
+      }
+
+      return new BucketWriter(rollInterval, rollSize, rollCount, batchSize,
+          context, filePath, fileName, inUsePrefix, inUseSuffix, fileSuffix,
+          codeC, compType, writer, timedRollerPool, proxyUser, sinkCounter,
+          idleTimeout, onCloseCallback, onCloseCallbackPath, callTimeout,
+          callTimeoutPool, retryInterval, maxCloseTries, clock);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1b437839/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkDeadlock.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkDeadlock.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkDeadlock.java
new file mode 100644
index 0000000..1e1e864
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkDeadlock.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Longs;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.sink.DefaultSinkProcessor;
+import org.apache.flume.source.SequenceGeneratorSource;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestHDFSEventSinkDeadlock {
+
+  public static void main(String... args) {
+    HDFSEventSink sink = new HDFSEventSink();
+    sink.setName("HDFSEventSink");
+
+    Context context = new Context(ImmutableMap.of(
+        "hdfs.path", "file:///tmp/flume-test/bucket-%t",
+        "hdfs.filePrefix", "flumetest",
+        "hdfs.rollInterval", "1",
+        "hdfs.maxOpenFiles", "1",
+        "hdfs.useLocalTimeStamp", "true"));
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+
+    final SequenceGeneratorSource source = new SequenceGeneratorSource();
+    Configurables.configure(source, new Context());
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(Collections.singletonList(channel));
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    sink.setChannel(channel);
+
+    channel.start();
+    source.start();
+
+    SinkProcessor sinkProcessor = new DefaultSinkProcessor();
+    sinkProcessor.setSinks(Collections.singletonList(sink));
+    SinkRunner sinkRunner = new SinkRunner();
+    sinkRunner.setSink(sinkProcessor);
+    sinkRunner.start();
+
+    ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
+
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        int i = 0;
+        while (true) {
+          try {
+            source.process();
+            System.out.println(i++);
+            if (i == 250) {
+              System.out.println("No deadlock found after 250 iterations, exiting");
+              System.exit(0);
+            }
+            Thread.sleep((long) (Math.random() * 100 + 950));
+          } catch (Exception e) {
+            //
+          }
+        }
+      }
+    });
+
+    executor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+        long[] threadIds = bean.findDeadlockedThreads();
+        if (threadIds != null) {
+          System.out.println("Deadlocked threads found");
+          printThreadStackTraces(threadIds);
+          System.exit(1);
+        }
+      }
+    }, 0, 1, TimeUnit.SECONDS);
+  }
+
+  private static void printThreadStackTraces(long[] threadIds) {
+    Set<Long> threadIdSet = new HashSet<>(Longs.asList(threadIds));
+    for (Thread th : Thread.getAllStackTraces().keySet()) {
+      if (threadIdSet.contains(th.getId())) {
+        System.out.println("Thread: " + th);
+        for (StackTraceElement e : th.getStackTrace()) {
+          System.out.println("\t" + e);
+        }
+        System.out.println("-----------------------------");
+      }
+    }
+  }
+}
\ No newline at end of file