You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/21 18:25:22 UTC

svn commit: r1341096 - in /incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src: main/java/org/apache/flume/sink/hdfs/ test/java/org/apache/flume/sink/hdfs/

Author: arvind
Date: Mon May 21 16:25:21 2012
New Revision: 1341096

URL: http://svn.apache.org/viewvc?rev=1341096&view=rev
Log:
FLUME-1219. Race conditions in BucketWriter and HDFSEventSink

(Mike Percy via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Mon May 21 16:25:21 2012
@@ -21,7 +21,6 @@ package org.apache.flume.sink.hdfs;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flume.Context;
-
 import org.apache.flume.Event;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
@@ -29,11 +28,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BucketWriter {
-
+/**
+ * Internal API intended for HDFSSink use.
+ * This class does file rolling and handles file formats and serialization.
+ * The methods in this class are NOT THREAD SAFE.
+ */
+class BucketWriter {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(BucketWriter.class);
@@ -50,16 +54,18 @@ public class BucketWriter {
   private long eventCounter;
   private long processSize;
   private long lastRollTime;
-  private long batchCounter;
-  private String filePath;
   private long rollInterval;
   private long rollSize;
   private long rollCount;
   private long batchSize;
   private CompressionCodec codeC;
   private CompressionType compType;
-  private String bucketPath;
+  private FileSystem fileSystem;
   private Context context;
+  private volatile String filePath;
+  private volatile String bucketPath;
+  private volatile long batchCounter;
+  private volatile boolean isOpen;
 
   // clear the class counters
   private void resetCounters() {
@@ -69,105 +75,145 @@ public class BucketWriter {
     batchCounter = 0;
   }
 
-  // constructor. initialize the thresholds and open the file handle
-  public BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize,
-      Context ctx) throws IOException {
+  BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize,
+      Context ctx, String fPath, CompressionCodec codec, CompressionType cType,
+      HDFSWriter hWriter, FlumeFormatter fmt) {
     rollInterval = rollInt;
     rollSize = rollSz;
     rollCount = rollCnt;
     batchSize = bSize;
     context = ctx;
+    filePath = fPath;
+    codeC = codec;
+    compType = cType;
+    writer = hWriter;
+    formatter = fmt;
+    isOpen = false;
 
-    resetCounters();
-    // open();
+    writer.configure(context);
   }
 
-  public void open() throws IOException {
+  /**
+   * open() is called by append()
+   * WARNING: acquires a lock on the logged-in Kerberos user object!
+   * @throws IOException
+   */
+  private void open() throws IOException {
     if ((filePath == null) || (writer == null) || (formatter == null)) {
       throw new IOException("Invalid file settings");
     }
 
-    writer.configure(context);
-
     long counter = fileExentionCounter.incrementAndGet();
-    if (codeC == null) {
-      bucketPath = filePath + "." + counter;
-      writer.open(bucketPath + IN_USE_EXT, formatter);
-    } else {
-      bucketPath = filePath + "." + counter
-          + codeC.getDefaultExtension();
-      writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
-    }
-    batchCounter = 0;
-    LOG.info("Creating " + bucketPath + IN_USE_EXT);
-  }
-
-  public void open(String fPath, HDFSWriter hWriter, FlumeFormatter fmt)
-      throws IOException {
-    open(fPath, null, CompressionType.NONE, hWriter, fmt);
-  }
 
-  public void open(String fPath, CompressionCodec codec, CompressionType cType,
-      HDFSWriter hWriter, FlumeFormatter fmt) throws IOException {
-    filePath = fPath;
-    codeC = codec;
-    compType = cType;
-    writer = hWriter;
-    formatter = fmt;
-    open();
-  }
+    Configuration config = new Configuration();
+    // disable FileSystem JVM shutdown hook
+    config.setBoolean("fs.automatic.close", false);
+
+    // Hadoop is not thread safe when doing certain RPC operations,
+    // including getFileSystem(), when running under Kerberos
+    UserGroupInformation staticLogin = UserGroupInformation.getLoginUser();
+    synchronized (staticLogin) {
+      if (codeC == null) {
+        bucketPath = filePath + "." + counter;
+        // need to get reference to FS before writer does to avoid shutdown hook
+        fileSystem = new Path(bucketPath).getFileSystem(config);
+        LOG.info("Creating " + bucketPath + IN_USE_EXT);
+        writer.open(bucketPath + IN_USE_EXT, formatter);
+      } else {
+        bucketPath = filePath + "." + counter
+            + codeC.getDefaultExtension();
+        // need to get reference to FS before writer does to avoid shutdown hook
+        fileSystem = new Path(bucketPath).getFileSystem(config);
+        LOG.info("Creating " + bucketPath + IN_USE_EXT);
+        writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+      }
+    }
 
-  // close the file handle
-  public void close() throws IOException {
-    LOG.debug("Closing " + bucketPath);
     resetCounters();
-    if (writer != null) {
-      writer.close(); // could block
-    }
-    renameBucket();
+    isOpen = true;
   }
 
-  // close the file, ignore the IOException
-  // ideally the underlying writer should discard unwritten data
-  public void abort() {
-    try {
-      close();
-    } catch (IOException ex) {
-      LOG.info("Exception during close on abort", ex);
+  /**
+   * Close the file handle and rename the temp file to the permanent filename.
+   * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
+   * @throws IOException On failure to rename if temp file exists.
+   */
+  public synchronized void close() throws IOException {
+    LOG.debug("Closing {}", bucketPath + IN_USE_EXT);
+    if (isOpen) {
+      try {
+        writer.close(); // could block
+      } catch (IOException e) {
+        LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
+            IN_USE_EXT + "). Exception follows.", e);
+      }
+      isOpen = false;
+    } else {
+      LOG.info("HDFSWriter is already closed: {}", bucketPath + IN_USE_EXT);
     }
-    try {
-      open();
-    } catch (IOException ex) {
-      LOG.warn("Exception during opem on abort", ex);
+    if (bucketPath != null && fileSystem != null) {
+      renameBucket(); // could block or throw IOException
+      fileSystem = null;
     }
   }
 
-  // flush the data
-  public void flush() throws IOException {
+  /**
+   * flush the data
+   */
+  public synchronized void flush() throws IOException {
     writer.sync(); // could block
     batchCounter = 0;
   }
 
-  // append the data, update stats, handle roll and batching
-  public void append(Event e) throws IOException {
-    writer.append(e, formatter); // could block
+  /**
+   * Open file handles, write data, update stats, handle file rolling and
+   * batching / flushing. <br />
+   * If the write fails, the file is implicitly closed and then the IOException
+   * is rethrown. <br />
+   * We rotate before append, and not after, so that the lastRollTime counter
+   * that is reset by the open() call approximately reflects when the first
+   * event was written to it.
+   */
+  public synchronized void append(Event event) throws IOException {
+    if (!isOpen) {
+      open();
+    }
+
+    // check if it's time to rotate the file
+    if (shouldRotate()) {
+      close();
+      open();
+    }
+
+    // write the event
+    try {
+      writer.append(event, formatter); // could block
+    } catch (IOException e) {
+      LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
+          bucketPath + IN_USE_EXT + ") and rethrowing exception.",
+          e.getMessage());
+      try {
+        close();
+      } catch (IOException e2) {
+        LOG.warn("Caught IOException while closing file (" +
+             bucketPath + IN_USE_EXT + "). Exception follows.", e2);
+      }
+      throw e;
+    }
 
     // update statistics
-    processSize += e.getBody().length;
+    processSize += event.getBody().length;
     eventCounter++;
     batchCounter++;
 
-    // check if its time to rotate the file
-    if (shouldRotate()) {
-      close();
-      open();
-    } else if ((batchCounter == batchSize)) {
+    if (batchCounter == batchSize) {
       flush();
     }
-
   }
 
-  // check if time to rotate the file
+  /**
+   * check if time to rotate the file
+   */
   private boolean shouldRotate() {
     boolean doRotate = false;
 
@@ -190,19 +236,16 @@ public class BucketWriter {
     return doRotate;
   }
 
-  public String getFilePath() {
-    return filePath;
-  }
-
+  /**
+   * Rename bucketPath file from .tmp to permanent location.
+   */
   private void renameBucket() throws IOException {
-    Configuration conf = new Configuration();
     Path srcPath = new Path(bucketPath + IN_USE_EXT);
     Path dstPath = new Path(bucketPath);
-    FileSystem hdfs = dstPath.getFileSystem(conf);
 
-    if(hdfs.exists(srcPath)) { // could block
+    if(fileSystem.exists(srcPath)) { // could block
       LOG.info("Renaming " + srcPath + " to " + dstPath);
-      hdfs.rename(srcPath, dstPath); // could block
+      fileSystem.rename(srcPath, dstPath); // could block
     }
   }
 

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Mon May 21 16:25:21 2012
@@ -39,7 +39,6 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
@@ -71,16 +70,10 @@ public class HDFSEventSink extends Abstr
   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   private static final int defaultMaxOpenFiles = 5000;
   /**
-   * Default length of time we wait for an append
-   * before closing the file and moving on.
+   * Default length of time we wait for blocking BucketWriter calls
+   * before timing out the operation. Intended to prevent server hangs.
    */
-  private static final long defaultAppendTimeout = 1000;
-  /**
-   * Default length of time we for a non-append
-   * before closing the file and moving on. This
-   * includes open/close/flush.
-   */
-  private static final long defaultCallTimeout = 5000;
+  private static final long defaultCallTimeout = 10000;
   /**
    * Default number of threads available for tasks
    * such as append/open/close/flush with hdfs.
@@ -113,7 +106,6 @@ public class HDFSEventSink extends Abstr
   private int maxOpenFiles;
   private String writeFormat;
   private ExecutorService executor;
-  private long appendTimeout;
 
   private String kerbConfPrincipal;
   private String kerbKeytab;
@@ -213,7 +205,6 @@ public class HDFSEventSink extends Abstr
     fileType = context.getString("hdfs.fileType", defaultFileType);
     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
     writeFormat = context.getString("hdfs.writeFormat");
-    appendTimeout = context.getLong("hdfs.appendTimeout", defaultAppendTimeout);
     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
     threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", defaultThreadPoolSize);
     kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
@@ -390,15 +381,16 @@ public class HDFSEventSink extends Abstr
    * Pull events out of channel and send it to HDFS - take at the most
    * txnEventMax, that's the maximum #events to hold in channel for a given
    * transaction - find the corresponding bucket for the event, ensure the file
-   * is open - extract the pay-load and append to HDFS file
+   * is open - extract the pay-load and append to HDFS file <br />
+   * WARNING: NOT THREAD SAFE
    */
   @Override
   public Status process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
     List<BucketWriter> writers = Lists.newArrayList();
+    transaction.begin();
     try {
-      transaction.begin();
       Event event = null;
       for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
         event = channel.take();
@@ -413,26 +405,15 @@ public class HDFSEventSink extends Abstr
 
         // we haven't seen this file yet, so open it and cache the handle
         if (bucketWriter == null) {
-          final HDFSWriter writer = writerFactory.getWriter(fileType);
-          final FlumeFormatter formatter = HDFSFormatterFactory
+
+          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
+          FlumeFormatter formatter = HDFSFormatterFactory
               .getFormatter(writeFormat);
-          bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context);
-          final BucketWriter callableWriter = bucketWriter;
-          final String callablePath = realPath;
-          final CompressionCodec callableCodec = codeC;
-          final CompressionType callableCompType = compType;
-
-          callWithTimeout(executor, callTimeout,
-              new ProxyCallable<Void>(proxyTicket) {
-            @Override
-            public Void doCall() throws Exception {
-              synchronized(callableWriter) {
-                callableWriter.open(callablePath, callableCodec,
-                    callableCompType, writer, formatter);
-              }
-              return null;
-            }
-          });
+
+          bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
+              batchSize, context, realPath, codeC, compType, hdfsWriter,
+              formatter);
+
           sfWriters.put(realPath, bucketWriter);
         }
 
@@ -444,19 +425,28 @@ public class HDFSEventSink extends Abstr
         // Write the data to HDFS
         final BucketWriter callableWriter = bucketWriter;
         final Event callableEvent = event;
-        callWithTimeout(executor, appendTimeout,
+        callWithTimeout(executor, callTimeout,
             new ProxyCallable<Void>(proxyTicket) {
           @Override
           public Void doCall() throws Exception {
-            synchronized(callableWriter) {
-              try {
-                callableWriter.append(callableEvent);
-              } catch(IOException ex) {
-                callableWriter.abort(); // close/open
-                callableWriter.append(callableEvent); // retry
-              }
-              return null;
-            }
+            callableWriter.append(callableEvent);
+            return null;
+          }
+        });
+      }
+
+      // flush all pending buckets before committing the transaction
+      for (BucketWriter writer : writers) {
+        if (writer.isBatchComplete()) {
+          continue;
+        }
+        final BucketWriter callableWriter = writer;
+        callWithTimeout(executor, callTimeout,
+            new ProxyCallable<Void>(proxyTicket) {
+          @Override
+          public Void doCall() throws Exception {
+            callableWriter.flush();
+            return null;
           }
         });
       }
@@ -479,45 +469,28 @@ public class HDFSEventSink extends Abstr
         throw new EventDeliveryException(th);
       }
     } finally {
-      // flush the buckets that still has pending data
-      // this ensures that the data removed from channel
-      // by the current transaction is safely on disk
-      for (BucketWriter writer : writers) {
-        if (writer.isBatchComplete()) {
-          continue;
-        }
-        final BucketWriter callableWriter = writer;
-        callWithTimeoutLogError(executor, callTimeout, "flush on "
-        + callableWriter, new ProxyCallable<Void>(proxyTicket) {
-          @Override
-          public Void doCall() throws Exception {
-            synchronized(callableWriter) {
-              callableWriter.flush();
-            }
-            return null;
-          }
-        });
-      }
       transaction.close();
     }
   }
 
   @Override
   public void stop() {
-    for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
-      LOG.info("Closing " + e.getKey());
-      final BucketWriter callableWriter = e.getValue();
-      callWithTimeoutLogError(executor, callTimeout, "close on " + e.getKey(),
-          new ProxyCallable<Void>(proxyTicket) {
+    // do not constrain close() calls with a timeout
+    for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
+      LOG.info("Closing " + entry.getKey());
+      final BucketWriter callableWriter = entry.getValue();
+      callWithTimeoutLogError(executor, callTimeout, "close on " +
+          entry.getKey(), new ProxyCallable<Void>(proxyTicket) {
+
         @Override
         public Void doCall() throws Exception {
-          synchronized(callableWriter) {
-            callableWriter.close();
-          }
+          callableWriter.close();
           return null;
         }
       });
     }
+
+    sfWriters.clear();
     executor.shutdown();
     try {
       while (executor.isTerminated() == false) {
@@ -534,31 +507,6 @@ public class HDFSEventSink extends Abstr
   @Override
   public void start() {
     executor = Executors.newFixedThreadPool(threadsPoolSize);
-    // FIXME: if restarted, the code below reopens the writers from the
-    // previous "process" executions. Is this what we want? Why
-    // not clear this list during close since the appropriate
-    // writers will be opened in "process"?
-    // TODO: check if this has anything to do with renaming .tmp files.
-    // If not, remove this code.
-    for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
-      final BucketWriter callableWriter = entry.getValue();
-      try {
-        callWithTimeout(executor, callTimeout,
-            new ProxyCallable<Void>(proxyTicket) {
-          @Override
-          public Void doCall() throws Exception {
-            synchronized(callableWriter) {
-              callableWriter.open();
-            }
-            return null;
-          }
-        });
-      } catch (IOException e) {
-        throw new FlumeException("Exception opening HDFS file.", e);
-      } catch (InterruptedException e) {
-        throw new FlumeException("Interrupt while opening HDFS file.", e);
-      }
-    }
     super.start();
   }
 
@@ -624,12 +572,15 @@ public class HDFSEventSink extends Abstr
       if (curUser == null || !curUser.getUserName().equals(principal)) {
         try {
           // static login
-          kerberosLogin(principal, kerbKeytab);
+          kerberosLogin(this, principal, kerbKeytab);
         } catch (IOException e) {
           LOG.error("Authentication or file read error while attempting to "
               + "login as kerberos principal (" + principal + ") using "
               + "keytab (" + kerbKeytab + "). Exception follows.", e);
+          return false;
         }
+      } else {
+        LOG.debug("{}: Using existing principal login: {}", this, curUser);
       }
 
       // we supposedly got through this unscathed... so store the static user
@@ -706,7 +657,7 @@ public class HDFSEventSink extends Abstr
    * @throws IOException if login fails.
    */
   private static synchronized UserGroupInformation kerberosLogin(
-      String principal, String keytab) throws IOException {
+      HDFSEventSink sink, String principal, String keytab) throws IOException {
 
     // if we are the 2nd user thru the lock, the login should already be
     // available statically if login was successful
@@ -719,11 +670,16 @@ public class HDFSEventSink extends Abstr
       LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
     }
 
-    // this means we really actually have not logged in successfully yet
-    if (curUser == null || !curUser.getUserName().equals(principal)) {
+    // we already have logged in successfully
+    if (curUser != null && curUser.getUserName().equals(principal)) {
+      LOG.debug("{}: Using existing principal ({}): {}",
+          new Object[] { sink, principal, curUser });
+
+    // no principal found
+    } else {
 
-      LOG.info("Attempting kerberos login as principal (" + principal + ") " +
-          "from keytab file (" + keytab + ")");
+      LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
+          "file ({})", new Object[] { sink, principal, keytab });
 
       // attempt static kerberos login
       UserGroupInformation.loginUserFromKeytab(principal, keytab);
@@ -733,4 +689,10 @@ public class HDFSEventSink extends Abstr
     return curUser;
   }
 
+  @Override
+  public String toString() {
+    return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
+        " }";
+  }
+
 }

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java Mon May 21 16:25:21 2012
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.io.SequenceFile;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,19 +36,14 @@ public class TestBucketWriter {
       LoggerFactory.getLogger(TestBucketWriter.class);
   private Context ctx = new Context();
 
-  @Before
-  public void setup() {
-
-  }
-
   @Test
   public void testEventCountingRoller() throws IOException {
     int maxEvents = 100;
-    BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx);
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
-
-    bucketWriter.open("/tmp/file", hdfsWriter, formatter);
+    BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
+        "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        formatter);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -59,19 +54,19 @@ public class TestBucketWriter {
     logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
     logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
 
-    Assert.assertEquals(hdfsWriter.getEventsWritten(), 1000);
-    Assert.assertEquals(hdfsWriter.getBytesWritten(), 3000);
-    Assert.assertEquals(hdfsWriter.getFilesOpened(), 11);
+    Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
+    Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
+    Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
   }
 
   @Test
   public void testSizeRoller() throws IOException {
     int maxBytes = 300;
-    BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx);
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
-
-    bucketWriter.open("/tmp/file", hdfsWriter, formatter);
+    BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
+        "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        formatter);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -82,19 +77,19 @@ public class TestBucketWriter {
     logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
     logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
 
-    Assert.assertEquals(hdfsWriter.getEventsWritten(), 1000);
-    Assert.assertEquals(hdfsWriter.getBytesWritten(), 3000);
-    Assert.assertEquals(hdfsWriter.getFilesOpened(), 11);
+    Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
+    Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
+    Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
   }
 
   @Test
   public void testIntervalRoller() throws IOException, InterruptedException {
     int rollInterval = 2; // seconds
-    BucketWriter bucketWriter = new BucketWriter(rollInterval, 0, 0, 0, ctx);
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
-
-    bucketWriter.open("/tmp/file", hdfsWriter, formatter);
+    BucketWriter bucketWriter = new BucketWriter(rollInterval, 0, 0, 0, ctx,
+        "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        formatter);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -114,9 +109,10 @@ public class TestBucketWriter {
     logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
     logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
 
-    Assert.assertEquals(hdfsWriter.getEventsWritten(), 1000);
-    Assert.assertEquals(hdfsWriter.getBytesWritten(), 3000);
-    Assert.assertEquals(hdfsWriter.getFilesOpened(), elapsedSeconds/2 + 1);
+    Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
+    Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
+    Assert.assertEquals("files opened", elapsedSeconds/2 + 1,
+        hdfsWriter.getFilesOpened());
   }
 
 }

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Mon May 21 16:25:21 2012
@@ -17,9 +17,6 @@
  */
 package org.apache.flume.sink.hdfs;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -57,8 +54,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,8 +176,7 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    // Note that we'll end up with one last file with only header
-    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
     // check the contents of the all files
     verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
@@ -295,8 +289,7 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    // Note that we'll end up with one last file with only header
-    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
     verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
@@ -374,8 +367,7 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    // Note that we'll end up with one last file with only header
-    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
     verifyOutputAvroFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
@@ -450,8 +442,7 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    // Note that we'll end up with two files which has have a header
-    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
@@ -623,13 +614,21 @@ public class TestHDFSEventSink {
         BytesWritable value = new BytesWritable();
         while(reader.next(key, value)) {
           String body = new String(value.getBytes(), 0, value.getLength());
-          bodies.remove(body);
-          found++;
+          if (bodies.contains(body)) {
+            LOG.debug("Found event body: {}", body);
+            bodies.remove(body);
+            found++;
+          }
         }
         reader.close();
       }
     }
-    assertTrue("Found = " + found + ", Expected = "  +
+    if (!bodies.isEmpty()) {
+      for (String body : bodies) {
+        LOG.error("Never found event body: {}", body);
+      }
+    }
+    Assert.assertTrue("Found = " + found + ", Expected = "  +
         expected + ", Left = " + bodies.size() + " " + bodies,
           bodies.size() == 0);
 
@@ -651,7 +650,7 @@ public class TestHDFSEventSink {
         reader.close();
       }
     }
-    assertTrue("Found = " + found + ", Expected = "  +
+    Assert.assertTrue("Found = " + found + ", Expected = "  +
         expected + ", Left = " + bodies.size() + " " + bodies,
           bodies.size() == 0);
 
@@ -681,25 +680,27 @@ public class TestHDFSEventSink {
         input.close();
       }
     }
-    assertTrue("Found = " + found + ", Expected = "  +
+    Assert.assertTrue("Found = " + found + ", Expected = "  +
         expected + ", Left = " + bodies.size() + " " + bodies,
           bodies.size() == 0);
   }
 
   /**
    * Ensure that when a write throws an IOException we are
-   * able to continue to progress (via close/open).
+   * able to continue to progress in the next process() call.
+   * This relies on Transactional rollback semantics for durability and
+   * the behavior of the BucketWriter class of close()ing upon IOException.
    */
   @Test
   public void testCloseReopen() throws InterruptedException,
       LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
+    final int numBatches = 4;
     final long txnMax = 25;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
-    final int numBatches = 4;
     String newPath = testPath + "/singleBucket";
     int i = 1, j = 1;
 
@@ -724,29 +725,8 @@ public class TestHDFSEventSink {
 
     Configurables.configure(sink, context);
 
-    Channel channel = mock(Channel.class);
-    final List<Event> events = Lists.newArrayList();
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        events.add((Event)args[0]);
-        return null;
-      }
-    }).when(channel).put(any(Event.class));
-
-    when(channel.take()).then(new Answer<Event>() {
-      @Override
-      public Event answer(InvocationOnMock invocation) throws Throwable {
-        if(events.isEmpty()) {
-          return null;
-        }
-        return events.remove(0);
-      }
-    });
-    when(channel.getTransaction()).thenReturn(mock(Transaction.class));
-
-    Configurables.configure(channel, context);
+    MemoryChannel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
 
     sink.setChannel(channel);
     sink.start();
@@ -755,19 +735,25 @@ public class TestHDFSEventSink {
     List<String> bodies = Lists.newArrayList();
     // push the event batches into channel
     for (i = 1; i < numBatches; i++) {
-      for (j = 1; j <= txnMax; j++) {
-        Event event = new SimpleEvent();
-        eventDate.clear();
-        eventDate.set(2011, i, i, i, 0); // yy mm dd
-        event.getHeaders().put("timestamp",
-            String.valueOf(eventDate.getTimeInMillis()));
-        event.getHeaders().put("hostname", "Host" + i);
-        String body = "Test." + i + "." + j;
-        event.setBody(body.getBytes());
-        bodies.add(body);
-        // inject fault
-        event.getHeaders().put("fault-until-reopen", "");
-        channel.put(event);
+      channel.getTransaction().begin();
+      try {
+        for (j = 1; j <= txnMax; j++) {
+          Event event = new SimpleEvent();
+          eventDate.clear();
+          eventDate.set(2011, i, i, i, 0); // yy mm dd
+          event.getHeaders().put("timestamp",
+              String.valueOf(eventDate.getTimeInMillis()));
+          event.getHeaders().put("hostname", "Host" + i);
+          String body = "Test." + i + "." + j;
+          event.setBody(body.getBytes());
+          bodies.add(body);
+          // inject fault
+          event.getHeaders().put("fault-until-reopen", "");
+          channel.put(event);
+        }
+        channel.getTransaction().commit();
+      } finally {
+        channel.getTransaction().close();
       }
       LOG.info("execute sink to process the events: " + sink.process());
     }
@@ -812,6 +798,7 @@ public class TestHDFSEventSink {
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    context.put("hdfs.callTimeout", Long.toString(1000));
     Configurables.configure(sink, context);
 
     Channel channel = new MemoryChannel();