You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/03/01 00:06:59 UTC

git commit: FLUME-1925. HDFS timeouts should not starve other threads.

Updated Branches:
  refs/heads/flume-1.4 bedfc4e9b -> 67d44cd84


FLUME-1925. HDFS timeouts should not starve other threads.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/67d44cd8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/67d44cd8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/67d44cd8

Branch: refs/heads/flume-1.4
Commit: 67d44cd84074ccfbc0ab295053852ea3acdc52dc
Parents: bedfc4e
Author: Mike Percy <mp...@apache.org>
Authored: Thu Feb 28 15:06:13 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Feb 28 15:06:49 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |  145 +++++++++++----
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   99 +----------
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   16 +-
 3 files changed, 126 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 0786857..cdc37f6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -21,9 +21,14 @@ package org.apache.flume.sink.hdfs;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.flume.Clock;
@@ -34,7 +39,6 @@ import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -93,6 +97,8 @@ class BucketWriter {
   private volatile ScheduledFuture<Void> idleFuture;
   private final WriterCallback onIdleCallback;
   private final String onIdleCallbackPath;
+  private final long callTimeout;
+  private final ExecutorService callTimeoutPool;
 
   private Clock clock = new SystemClock();
 
@@ -101,12 +107,13 @@ class BucketWriter {
   protected boolean idleClosed = false;
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
-      Context context, String filePath, String fileName, String inUsePrefix,
-      String inUseSuffix, String fileSuffix, CompressionCodec codeC,
-      CompressionType compType, HDFSWriter writer,
-      ScheduledExecutorService timedRollerPool, UserGroupInformation user,
-      SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
-      String onIdleCallbackPath) {
+    Context context, String filePath, String fileName, String inUsePrefix,
+    String inUseSuffix, String fileSuffix, CompressionCodec codeC,
+    CompressionType compType, HDFSWriter writer,
+    ScheduledExecutorService timedRollerPool, UserGroupInformation user,
+    SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
+    String onIdleCallbackPath, long callTimeout,
+    ExecutorService callTimeoutPool) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -125,7 +132,8 @@ class BucketWriter {
     this.idleTimeout = idleTimeout;
     this.onIdleCallback = onIdleCallback;
     this.onIdleCallbackPath = onIdleCallbackPath;
-
+    this.callTimeout = callTimeout;
+    this.callTimeoutPool = callTimeoutPool;
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
     isOpen = false;
@@ -193,7 +201,7 @@ class BucketWriter {
       throw new IOException("Invalid file settings");
     }
 
-    Configuration config = new Configuration();
+    final Configuration config = new Configuration();
     // disable FileSystem JVM shutdown hook
     config.setBoolean("fs.automatic.close", false);
 
@@ -223,16 +231,22 @@ class BucketWriter {
         targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
 
         LOG.info("Creating " + bucketPath);
-        if (codeC == null) {
-          // Need to get reference to FS using above config before underlying
-          // writer does in order to avoid shutdown hook & IllegalStateExceptions
-          fileSystem = new Path(bucketPath).getFileSystem(config);
-          writer.open(bucketPath);
-        } else {
-          // need to get reference to FS before writer does to avoid shutdown hook
-          fileSystem = new Path(bucketPath).getFileSystem(config);
-          writer.open(bucketPath, codeC, compType);
-        }
+        callWithTimeout(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            if (codeC == null) {
+              // Need to get reference to FS using above config before underlying
+              // writer does in order to avoid shutdown hook & IllegalStateExceptions
+              fileSystem = new Path(bucketPath).getFileSystem(config);
+              writer.open(bucketPath);
+            } else {
+              // need to get reference to FS before writer does to avoid shutdown hook
+              fileSystem = new Path(bucketPath).getFileSystem(config);
+              writer.open(bucketPath, codeC, compType);
+            }
+            return null;
+          }
+        });
       } catch (Exception ex) {
         sinkCounter.incrementConnectionFailedCount();
         if (ex instanceof IOException) {
@@ -287,11 +301,17 @@ class BucketWriter {
    * doClose() must only be called by close()
    * @throws IOException
    */
-  private void doClose() throws IOException {
+  private void doClose() throws IOException, InterruptedException {
     LOG.debug("Closing {}", bucketPath);
     if (isOpen) {
       try {
-        writer.close(); // could block
+        callWithTimeout(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            writer.close(); // could block
+            return null;
+          }
+        });
         sinkCounter.incrementConnectionClosedCount();
       } catch (IOException e) {
         LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
@@ -361,8 +381,14 @@ class BucketWriter {
    * doFlush() must only be called by flush()
    * @throws IOException
    */
-  private void doFlush() throws IOException {
-    writer.sync(); // could block
+  private void doFlush() throws IOException, InterruptedException {
+    callWithTimeout(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        writer.sync(); // could block
+        return null;
+      }
+    });
     batchCounter = 0;
   }
 
@@ -378,7 +404,7 @@ class BucketWriter {
    * @throws IOException
    * @throws InterruptedException
    */
-  public synchronized void append(Event event)
+  public synchronized void append(final Event event)
           throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     if (!isOpen) {
@@ -398,7 +424,13 @@ class BucketWriter {
     // write the event
     try {
       sinkCounter.incrementEventDrainAttemptCount();
-      writer.append(event); // could block
+      callWithTimeout(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          writer.append(event); // could block
+          return null;
+        }
+      });
     } catch (IOException e) {
       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
           bucketPath + ") and rethrowing exception.",
@@ -444,18 +476,24 @@ class BucketWriter {
   /**
    * Rename bucketPath file from .tmp to permanent location.
    */
-  private void renameBucket() throws IOException {
+  private void renameBucket() throws IOException, InterruptedException {
     if(bucketPath.equals(targetPath)) {
       return;
     }
 
-    Path srcPath = new Path(bucketPath);
-    Path dstPath = new Path(targetPath);
+    final Path srcPath = new Path(bucketPath);
+    final Path dstPath = new Path(targetPath);
 
-    if(fileSystem.exists(srcPath)) { // could block
-      LOG.info("Renaming " + srcPath + " to " + dstPath);
-      fileSystem.rename(srcPath, dstPath); // could block
-    }
+    callWithTimeout(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        if(fileSystem.exists(srcPath)) { // could block
+          LOG.info("Renaming " + srcPath + " to " + dstPath);
+          fileSystem.rename(srcPath, dstPath); // could block
+        }
+        return null;
+      }
+    });
   }
 
   @Override
@@ -485,4 +523,47 @@ class BucketWriter {
               + "taking too long.");
     }
   }
+
+  /**
+   * Execute the callable on a separate thread and wait for the completion
+   * for the specified amount of time in milliseconds. In case of timeout
+   * cancel the callable and throw an IOException
+   */
+  private <T> T callWithTimeout(Callable<T> callable)
+    throws IOException, InterruptedException {
+    Future<T> future = callTimeoutPool.submit(callable);
+    try {
+      if (callTimeout > 0) {
+        return future.get(callTimeout, TimeUnit.MILLISECONDS);
+      } else {
+        return future.get();
+      }
+    } catch (TimeoutException eT) {
+      future.cancel(true);
+      sinkCounter.incrementConnectionFailedCount();
+      throw new IOException("Callable timed out after " + callTimeout + " ms",
+        eT);
+    } catch (ExecutionException e1) {
+      sinkCounter.incrementConnectionFailedCount();
+      Throwable cause = e1.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
+      } else if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
+      } else if (cause instanceof Error) {
+        throw (Error)cause;
+      } else {
+        throw new RuntimeException(e1);
+      }
+    } catch (CancellationException ce) {
+      throw new InterruptedException(
+        "Blocked callable interrupted by rotation event");
+    } catch (InterruptedException ex) {
+      LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
+      throw ex;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 76e3d1f..741ac90 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -26,15 +26,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.TimeZone;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -81,7 +76,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   private static final String defaultInUsePrefix = "";
   private static final String defaultInUseSuffix = ".tmp";
   private static final long defaultBatchSize = 100;
-  private static final long defaultTxnEventMax = 100;
   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   private static final int defaultMaxOpenFiles = 5000;
 
@@ -336,47 +330,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     return codec;
   }
 
-  /**
-   * Execute the callable on a separate thread and wait for the completion
-   * for the specified amount of time in milliseconds. In case of timeout
-   * cancel the callable and throw an IOException
-   */
-  private <T> T callWithTimeout(Callable<T> callable)
-      throws IOException, InterruptedException {
-    Future<T> future = callTimeoutPool.submit(callable);
-    try {
-      if (callTimeout > 0) {
-        return future.get(callTimeout, TimeUnit.MILLISECONDS);
-      } else {
-        return future.get();
-      }
-    } catch (TimeoutException eT) {
-      future.cancel(true);
-      sinkCounter.incrementConnectionFailedCount();
-      throw new IOException("Callable timed out after " + callTimeout + " ms",
-          eT);
-    } catch (ExecutionException e1) {
-      sinkCounter.incrementConnectionFailedCount();
-      Throwable cause = e1.getCause();
-      if (cause instanceof IOException) {
-        throw (IOException) cause;
-      } else if (cause instanceof InterruptedException) {
-        throw (InterruptedException) cause;
-      } else if (cause instanceof RuntimeException) {
-        throw (RuntimeException) cause;
-      } else if (cause instanceof Error) {
-        throw (Error)cause;
-      } else {
-        throw new RuntimeException(e1);
-      }
-    } catch (CancellationException ce) {
-      throw new InterruptedException(
-          "Blocked callable interrupted by rotation event");
-    } catch (InterruptedException ex) {
-      LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
-      throw ex;
-    }
-  }
 
   /**
    * Pull events out of channel and send it to HDFS. Take at most batchSize
@@ -423,7 +376,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
               batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
               suffix, codeC, compType, hdfsWriter, timedRollerPool,
-              proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath);
+              proxyTicket, sinkCounter, idleTimeout, idleCallback,
+              lookupPath, callTimeout, callTimeoutPool);
 
           sfWriters.put(lookupPath, bucketWriter);
         }
@@ -434,7 +388,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
         }
 
         // Write the data to HDFS
-        append(bucketWriter, event);
+        bucketWriter.append(event);
       }
 
       if (txnEventCount == 0) {
@@ -447,7 +401,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
 
       // flush all pending buckets before committing the transaction
       for (BucketWriter bucketWriter : writers) {
-        flush(bucketWriter);
+        bucketWriter.flush();
       }
 
       transaction.commit();
@@ -482,7 +436,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
       LOG.info("Closing {}", entry.getKey());
 
       try {
-        close(entry.getValue());
+        entry.getValue().close();
       } catch (Exception ex) {
         LOG.warn("Exception while closing " + entry.getKey() + ". " +
                 "Exception follows.", ex);
@@ -727,49 +681,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
             " }";
   }
 
-  /**
-   * Append to bucket writer with timeout enforced
-   */
-  private void append(final BucketWriter bucketWriter, final Event event)
-          throws IOException, InterruptedException {
-
-    // Write the data to HDFS
-    callWithTimeout(new Callable<Void>() {
-      public Void call() throws Exception {
-        bucketWriter.append(event);
-        return null;
-      }
-    });
-  }
-
-  /**
-   * Flush bucket writer with timeout enforced
-   */
-  private void flush(final BucketWriter bucketWriter)
-          throws IOException, InterruptedException {
-
-    callWithTimeout(new Callable<Void>() {
-      public Void call() throws Exception {
-        bucketWriter.flush();
-        return null;
-      }
-    });
-  }
-
-  /**
-   * Close bucket writer with timeout enforced
-   */
-  private void close(final BucketWriter bucketWriter)
-          throws IOException, InterruptedException {
-
-    callWithTimeout(new Callable<Void>() {
-      public Void call() throws Exception {
-        bucketWriter.close();
-        return null;
-      }
-    });
-  }
-
   @VisibleForTesting
   void setBucketClock(Clock clock) {
     BucketPath.setClock(clock);

http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index ebe277c..99b6150 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -69,7 +69,7 @@ public class TestBucketWriter {
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
         hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
-        null, null);
+        null, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -93,7 +93,7 @@ public class TestBucketWriter {
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
         hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null);
+        0, null, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -119,7 +119,7 @@ public class TestBucketWriter {
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
         hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null);
+        0, null, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -205,7 +205,7 @@ public class TestBucketWriter {
         path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null);
+        0, null, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -228,7 +228,7 @@ public class TestBucketWriter {
           "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
           timedRollerPool, null,
           new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-          0, null, null);
+          0, null, null, 30000, Executors.newSingleThreadExecutor());
 
       // Need to override system time use for test so we know what to expect
       final long testTime = System.currentTimeMillis();
@@ -255,7 +255,7 @@ public class TestBucketWriter {
             "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
             timedRollerPool, null,
             new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-            0, null, null);
+            0, null, null, 30000, Executors.newSingleThreadExecutor());
 
         // Need to override system time use for test so we know what to expect
 
@@ -285,7 +285,7 @@ public class TestBucketWriter {
         "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null);
+        0, null, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -304,7 +304,7 @@ public class TestBucketWriter {
         "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null);
+        0, null, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);