You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/12 21:35:58 UTC

svn commit: r1182552 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/channel/ flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/ flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/...

Author: arvind
Date: Wed Oct 12 19:35:57 2011
New Revision: 1182552

URL: http://svn.apache.org/viewvc?rev=1182552&view=rev
Log:
FLUME-788. Add more test cases for Flume NG HDFS sink.

(Prasad Mujumdar via Arvind Prabhakar)

Added:
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java Wed Oct 12 19:35:57 2011
@@ -226,7 +226,8 @@ public class MultiOpMemChannel implement
     StampedEvent undoEvent;
     StampedEvent currentEvent;
 
-    while ((undoEvent = myTxn.getUndoPutList().removeLast()) != null) {
+    while ((myTxn.getUndoPutList().isEmpty()) == false) {
+      undoEvent = myTxn.getUndoPutList().removeLast();
       currentEvent = queue.removeLast();
       Preconditions.checkNotNull(currentEvent, "Rollback error");
       Preconditions.checkArgument(currentEvent == undoEvent ,
@@ -267,8 +268,9 @@ public class MultiOpMemChannel implement
    */
   protected void undoTake(MemTransaction myTxn) {
     StampedEvent e;
- 
-    while ((e = myTxn.getUndoTakeList().removeLast()) != null) {
+
+    while (myTxn.getUndoTakeList().isEmpty() == false) {
+      e = myTxn.getUndoTakeList().removeLast();
       queue.addFirst(e);
     }
   }

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Wed Oct 12 19:35:57 2011
@@ -105,9 +105,11 @@ public class BucketWriter {
   }
 
   // close the file, ignore the IOException
+  // ideally the underlying writer should discard unwritten data
   public void abort() {
     try {
       close();
+      open();
     } catch (IOException eIO) {
       // Ignore it
     }
@@ -167,11 +169,11 @@ public class BucketWriter {
     if ((rollInterval > 0)
         && (rollInterval < (System.currentTimeMillis() - lastProcessTime) / 1000))
       doRotate = true;
-    if ((rollCount > 0) && (rollCount < eventCounter)) {
+    if ((rollCount > 0) && (rollCount <= eventCounter)) {
       eventCounter = 0;
       doRotate = true;
     }
-    if ((rollSize > 0) && (rollSize < processSize)) {
+    if ((rollSize > 0) && (rollSize <= processSize)) {
       processSize = 0;
       doRotate = true;
     }

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Oct 12 19:35:57 2011
@@ -71,6 +71,7 @@ public class HDFSEventSink extends Abstr
   private String path;
   private int maxOpenFiles;
   private String writeFormat;
+  private HDFSWriterFactory myWriterFactory; 
 
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
@@ -106,7 +107,11 @@ public class HDFSEventSink extends Abstr
   // private boolean shouldSub = false;
 
   public HDFSEventSink() {
-
+    myWriterFactory = new HDFSWriterFactory();
+  }
+  
+  public HDFSEventSink(HDFSWriterFactory newWriterFactory) {
+    myWriterFactory = newWriterFactory;
   }
 
   // read configuration and setup thresholds
@@ -267,7 +272,7 @@ public class HDFSEventSink extends Abstr
 
         // we haven't seen this file yet, so open it and cache the handle
         if (bw == null) {
-          HDFSWriter writer = HDFSWriterFactory.getWriter(fileType);
+          HDFSWriter writer = myWriterFactory.getWriter(fileType);
           FlumeFormatter formatter = HDFSFormatterFactory
               .getFormatter(writeFormat);
           bw = new BucketWriter(rollInterval, rollSize, rollCount, batchSize);
@@ -296,7 +301,7 @@ public class HDFSEventSink extends Abstr
       return Status.READY;
     } catch (IOException eIO) {
       transaction.rollback();
-      LOG.error("HDFS IO error", eIO);
+      LOG.warn("HDFS IO error", eIO);
       return Status.BACKOFF;
     } catch (Exception e) {
       transaction.rollback();

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java Wed Oct 12 19:35:57 2011
@@ -47,7 +47,9 @@ public class HDFSTextFormatter implement
 
   @Override
   public byte[] getBytes(Event e) {
-    return makeText(e).getBytes();
+    Text record = makeText(e);
+    record.append("\n".getBytes(), 0, 1);
+    return record.getBytes();
   }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java Wed Oct 12 19:35:57 2011
@@ -20,7 +20,7 @@ package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
 
-abstract class HDFSWriterFactory {
+public class HDFSWriterFactory {
   static final String SequenceFileType = "SequenceFile";
   static final String DataStreamType = "DataStream";
   static final String CompStreamType = "CompressedStream";
@@ -29,7 +29,7 @@ abstract class HDFSWriterFactory {
 
   }
 
-  public static HDFSWriter getWriter(String fileType) throws IOException {
+  public HDFSWriter getWriter(String fileType) throws IOException {
     if (fileType == SequenceFileType) {
       return new HDFSSequenceFile();
     } else if (fileType == DataStreamType) {

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java?rev=1182552&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java Wed Oct 12 19:35:57 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.flume.sink.hdfs.HDFSSequenceFile;
+
+
+public class HDFSBadDataStream extends HDFSDataStream {
+  public class HDFSBadSeqWriter extends HDFSSequenceFile {
+    @Override
+    public void append(Event e, FlumeFormatter fmt) throws IOException {
+
+      if (e.getHeaders().containsKey("fault")) {
+        throw new IOException("Injected fault");
+      }
+      super.append(e, fmt);
+    }
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java?rev=1182552&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java Wed Oct 12 19:35:57 2011
@@ -0,0 +1,22 @@
+package org.apache.flume.sink.hdfs;
+
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+
+public class HDFSBadSeqWriter extends HDFSSequenceFile {
+  @Override
+  public void append(Event e, FlumeFormatter fmt) throws IOException {
+
+    if (e.getHeaders().containsKey("fault")) {
+      throw new IOException("Injected fault");
+    } else if (e.getHeaders().containsKey("fault-once")) {
+      e.getHeaders().remove("fault-once");
+      throw new IOException("Injected fault");
+    }
+    super.append(e, fmt);
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java?rev=1182552&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java Wed Oct 12 19:35:57 2011
@@ -0,0 +1,22 @@
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.sink.hdfs.HDFSBadSeqWriter;
+import org.apache.flume.sink.hdfs.HDFSBadDataStream;
+
+public class HDFSBadWriterFactory extends HDFSWriterFactory {
+  static final String BadSequenceFileType = "SequenceFile";
+  static final String BadDataStreamType = "DataStream";
+  static final String BadCompStreamType = "CompressedStream";
+
+  public HDFSWriter getWriter(String fileType) throws IOException {
+    if (fileType == BadSequenceFileType) {
+      return new HDFSBadSeqWriter();
+    } else if (fileType == BadDataStreamType) {
+      return new HDFSBadDataStream();
+    } else {
+      throw new IOException("File type " + fileType + " not supported");
+    }
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Wed Oct 12 19:35:57 2011
@@ -17,31 +17,60 @@
  */
 package org.apache.flume.sink.hdfs;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Calendar;
 
+import junit.framework.Assert;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.MultiOpMemChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.sink.hdfs.HDFSEventSink;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.flume.sink.hdfs.HDFSBadWriterFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import junit.framework.TestCase;
+import junit.framework.TestResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class TestHDFSEventSink {
+public class TestHDFSEventSink extends TestCase {
 
   private HDFSEventSink sink;
   private String testPath;
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
+
+  private void dirCleanup() {
+    Configuration conf = new Configuration();
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      Path dirPath = new Path(testPath);
+      fs.delete(dirPath, true);
+    } catch (IOException eIO) {
+      LOG.warn("IO Error in test cleanup", eIO);
+    }
+  }
 
+  // TODO: use System.getProperty("file.separator") instead of hardcoded '/'
   @Before
   public void setUp() {
     /*
@@ -50,12 +79,17 @@ public class TestHDFSEventSink {
      * Hadoop config points at file:/// rather than hdfs://. We need to find a
      * better way of testing HDFS related functionality.
      */
-    testPath = "/user/flume/testdata";
+    testPath = "file:///tmp/fluem-test." + Calendar.getInstance().getTimeInMillis() 
+        + "." + Thread.currentThread().getId();
+
     sink = new HDFSEventSink();
+    dirCleanup();
   }
 
   @After
   public void tearDown() {
+    if( System.getenv("hdfs_keepFiles") == null)
+      dirCleanup();
   }
 
   @Test
@@ -75,28 +109,34 @@ public class TestHDFSEventSink {
   }
 
   @Test
-  public void testAppend() throws InterruptedException, LifecycleException,
+  public void testTextAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException, IOException {
 
     final long txnMax = 25;
     final long rollCount = 3;
     final long batchSize = 2;
     final String fileName = "FlumeData";
+    String newPath = testPath + "/singleTextBucket";
+    int totalEvents = 0;
+    int i=1,j=1;
 
     // clear the test directory
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
-    Path dirPath = new Path(testPath);
+    Path dirPath = new Path(newPath);
     fs.delete(dirPath, true);
     fs.mkdirs(dirPath);
 
     Context context = new Context();
 
-    context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+//    context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+    context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.writeFormat","Text");
+    context.put("hdfs.fileType", "DataStream");
 
     Configurables.configure(sink, context);
 
@@ -111,9 +151,9 @@ public class TestHDFSEventSink {
     Calendar eventDate = Calendar.getInstance();
 
     // push the event batches into channel
-    for (int i = 1; i < 4; i++) {
+    for (i = 1; i < 4; i++) {
       txn.begin();
-      for (int j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= txnMax; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -123,6 +163,7 @@ public class TestHDFSEventSink {
 
         event.setBody(("Test." + i + "." + j).getBytes());
         channel.put(event);
+        totalEvents++;
       }
       txn.commit();
 
@@ -132,25 +173,138 @@ public class TestHDFSEventSink {
 
     sink.stop();
 
-    /*
-     * 
-     * // loop through all the files generated and check their contains
-     * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
-     * FileUtil.stat2Paths(dirStat);
-     * 
-     * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
-     * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
-     * = new LongWritable(); BytesWritable value = new BytesWritable();
-     * 
-     * while (reader.next(key, value)) { logger.info(key+ ":" +
-     * value.toString()); } reader.close(); } } catch (IOException ioe) {
-     * System.err.println("IOException during operation: " + ioe.toString());
-     * System.exit(1); }
-     */
+    // loop through all the files generated and check their contains
+    FileStatus[] dirStat = fs.listStatus(dirPath);
+    Path fList[] = FileUtil.stat2Paths(dirStat);
+
+    // check that the roll happened correctly for the given data
+    // Note that we'll end up with one last file with only header
+    Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+
+    try {
+      i = j = 1;
+      for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+        Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+        FSDataInputStream input = fs.open(filePath);
+        BufferedReader d = new BufferedReader(new InputStreamReader(input));
+        String line;
+
+        while ((line =  d.readLine()) != null) {
+          Assert.assertEquals(line, ("Test." + i + "." + j));
+          if ( ++j > txnMax) {
+            j = 1;
+            i++;
+          }
+        }
+        input.close(); 
+       }
+    } catch (IOException ioe) {
+      System.err.println("IOException during operation: " + ioe.toString());
+      return; 
+    } 
+    Assert.assertEquals(i, 4);    
   }
 
   @Test
-  public void testTextAppend() throws InterruptedException, LifecycleException,
+  public void testSimpleAppend() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    final long txnMax = 25;
+    final String fileName = "FlumeData";
+    final long rollCount = 5;
+    final long batchSize = 2;
+    final int numBatches = 4;
+    String newPath = testPath + "/singleBucket";
+    int totalEvents = 0;
+    int i=1,j=1;
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new MultiOpMemChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (i = 1; i < numBatches; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+
+        event.setBody(("Test." + i + "." + j).getBytes());
+        channel.put(event);
+        totalEvents ++;
+      }
+      txn.commit();
+      txn.close();
+
+      // execute sink to process the events
+      sink.process();
+    }
+
+    sink.stop();
+
+
+    // loop through all the files generated and check their contains
+    FileStatus[] dirStat = fs.listStatus(dirPath);
+    Path fList[] = FileUtil.stat2Paths(dirStat);
+
+    // check that the roll happened correctly for the given data
+    // Note that we'll end up with one last file with only header
+    Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+
+    try {
+      i = j = 1;
+      for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+        Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+        LongWritable key = new LongWritable(); 
+        BytesWritable value = new BytesWritable();
+        BytesWritable expValue;
+
+        while (reader.next(key, value)) {
+          expValue =  new BytesWritable(("Test." + i + "." + j).getBytes());
+          Assert.assertEquals(expValue, value);
+          if ( ++j > txnMax) {
+            j = 1;
+            i++;
+          }
+        }
+        reader.close(); 
+       } 
+    } catch (IOException ioe) {
+      System.err.println("IOException during operation: " + ioe.toString());
+      System.exit(1); 
+    } 
+    Assert.assertEquals(i, 4);    
+    
+  }
+
+  @Test
+  public void testAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException, IOException {
 
     final long txnMax = 25;
@@ -172,8 +326,6 @@ public class TestHDFSEventSink {
     context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
-    context.put("hdfs.writeFormat","Text");
-    context.put("hdfs.fileType", "DataStream");
 
     Configurables.configure(sink, context);
 
@@ -226,5 +378,80 @@ public class TestHDFSEventSink {
      */
   }
 
-  
+
+  // inject fault and make sure that the txn is rolled back and retried
+  @Test
+  public void testBadSimpleAppend() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    final long txnMax = 25;
+    final String fileName = "FlumeData";
+    final long rollCount = 5;
+    final long batchSize = 2;
+    final int numBatches = 4;
+    String newPath = testPath + "/singleBucket";
+    int totalEvents = 0;
+    int i=1,j=1;
+
+    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    sink = new HDFSEventSink(badWriterFactory);
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new MultiOpMemChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (i = 1; i < numBatches; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+
+        event.setBody(("Test." + i + "." + j).getBytes());
+        // inject fault
+        if ((totalEvents % 30) == 1) {
+          event.getHeaders().put("fault-once", "");
+        }
+        channel.put(event);
+        totalEvents ++;
+      }
+      txn.commit();
+      txn.close();
+
+      // execute sink to process the events
+      sink.process();
+    }
+    LOG.info("clear any events pending due to errors");
+    // clear any events pending due to errors
+    sink.process();
+
+    sink.stop();
+  }
+
 }