You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/03/01 09:59:03 UTC

git commit: FLUME-1916. HDFS sink should poll for # of active replicas. If less than required, roll the file.

Updated Branches:
  refs/heads/trunk ed4cae79f -> 17338bf30


FLUME-1916. HDFS sink should poll for # of active replicas. If less than required, roll the file.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/17338bf3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/17338bf3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/17338bf3

Branch: refs/heads/trunk
Commit: 17338bf303e617054576813b02d057b98753b6aa
Parents: ed4cae7
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Mar 1 00:57:36 2013 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Mar 1 00:57:36 2013 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    1 +
 .../apache/flume/sink/hdfs/AbstractHDFSWriter.java |  166 +++++++++++++++
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   39 ++++-
 .../flume/sink/hdfs/HDFSCompressedDataStream.java  |    9 +-
 .../org/apache/flume/sink/hdfs/HDFSDataStream.java |   10 +-
 .../apache/flume/sink/hdfs/HDFSSequenceFile.java   |   20 ++-
 .../org/apache/flume/sink/hdfs/HDFSWriter.java     |    6 +
 .../org/apache/flume/sink/hdfs/MockHDFSWriter.java |    5 +
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |    5 +
 .../sink/hdfs/TestHDFSEventSinkOnMiniCluster.java  |  133 +++++++++++-
 10 files changed, 375 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 5ac903e..01067d1 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1371,6 +1371,7 @@ hdfs.fileType           SequenceFile  File format: currently ``SequenceFile``, `
                                       (1)DataStream will not compress output file and please don't set codeC
                                       (2)CompressedStream requires set hdfs.codeC with an available codeC
 hdfs.maxOpenFiles       5000          Allow only this number of open files. If this number is exceeded, the oldest file is closed.
+hdfs.minBlockReplicas   --            Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
 hdfs.writeFormat        --            "Text" or "Writable"
 hdfs.callTimeout        10000         Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.
                                       This number should be increased if many HDFS timeout operations are occurring.

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
new file mode 100644
index 0000000..ff4f223
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractHDFSWriter implements HDFSWriter {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AbstractHDFSWriter.class);
+
+  private FSDataOutputStream outputStream;
+  private FileSystem fs;
+  private Method refGetNumCurrentReplicas = null;
+  private Integer configuredMinReplicas = null;
+
+  final static Object [] NO_ARGS = new Object []{};
+
+  @Override
+  public void configure(Context context) {
+    configuredMinReplicas = context.getInteger("hdfs.minBlockReplicas");
+    if (configuredMinReplicas != null) {
+      Preconditions.checkArgument(configuredMinReplicas >= 0,
+          "hdfs.minBlockReplicas must be greater than or equal to 0");
+    }
+  }
+
+  /**
+   * Contract for subclasses: Call registerCurrentStream() on open,
+   * unregisterCurrentStream() on close, and the base class takes care of the
+   * rest.
+   * @return
+   */
+  @Override
+  public boolean isUnderReplicated() {
+    try {
+      int numBlocks = getNumCurrentReplicas();
+      if (numBlocks == -1) {
+        return false;
+      }
+      int desiredBlocks;
+      if (configuredMinReplicas != null) {
+        desiredBlocks = configuredMinReplicas;
+      } else {
+        desiredBlocks = getFsDesiredReplication();
+      }
+      return numBlocks < desiredBlocks;
+    } catch (IllegalAccessException e) {
+      logger.error("Unexpected error while checking replication factor", e);
+    } catch (InvocationTargetException e) {
+      logger.error("Unexpected error while checking replication factor", e);
+    } catch (IllegalArgumentException e) {
+      logger.error("Unexpected error while checking replication factor", e);
+    }
+    return false;
+  }
+
+  protected void registerCurrentStream(FSDataOutputStream outputStream,
+                                      FileSystem fs) {
+    Preconditions.checkNotNull(outputStream, "outputStream must not be null");
+    Preconditions.checkNotNull(fs, "fs must not be null");
+
+    this.outputStream = outputStream;
+    this.fs = fs;
+    this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
+  }
+
+  protected void unregisterCurrentStream() {
+    this.outputStream = null;
+    this.fs = null;
+    this.refGetNumCurrentReplicas = null;
+  }
+
+  public int getFsDesiredReplication() {
+    if (fs != null) {
+      return fs.getDefaultReplication();
+    }
+    return 0;
+  }
+
+  /**
+   * This method gets the datanode replication count for the current open file.
+   *
+   * If the pipeline isn't started yet or is empty, you will get the default
+   * replication factor.
+   *
+   * <p/>If this function returns -1, it means you
+   * are not properly running with the HDFS-826 patch.
+   * @throws InvocationTargetException
+   * @throws IllegalAccessException
+   * @throws IllegalArgumentException
+   */
+  public int getNumCurrentReplicas()
+      throws IllegalArgumentException, IllegalAccessException,
+          InvocationTargetException {
+    if (refGetNumCurrentReplicas != null && outputStream != null) {
+      OutputStream dfsOutputStream = outputStream.getWrappedStream();
+      if (dfsOutputStream != null) {
+        Object repl = refGetNumCurrentReplicas.invoke(dfsOutputStream, NO_ARGS);
+        if (repl instanceof Integer) {
+          return ((Integer)repl).intValue();
+        }
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
+   * @return Method or null.
+   */
+  private Method reflectGetNumCurrentReplicas(FSDataOutputStream os) {
+    Method m = null;
+    if (os != null) {
+      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
+          .getClass();
+      try {
+        m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
+            new Class<?>[] {});
+        m.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        logger.info("FileSystem's output stream doesn't support"
+            + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
+            + wrappedStreamClass.getName() + "; err=" + e);
+      } catch (SecurityException e) {
+        logger.info("Doesn't have access to getNumCurrentReplicas on "
+            + "FileSystems's output stream --HDFS-826 not available; fsOut="
+            + wrappedStreamClass.getName(), e);
+        m = null; // could happen on setAccessible()
+      }
+    }
+    if (m != null) {
+      logger.debug("Using getNumCurrentReplicas--HDFS-826");
+    }
+    return m;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index cdc37f6..c11fb20 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -91,6 +91,8 @@ class BucketWriter {
   private volatile String targetPath;
   private volatile long batchCounter;
   private volatile boolean isOpen;
+  private volatile boolean isUnderReplicated;
+  private volatile int consecutiveUnderReplRotateCount;
   private volatile ScheduledFuture<Void> timedRollFuture;
   private SinkCounter sinkCounter;
   private final int idleTimeout;
@@ -99,6 +101,7 @@ class BucketWriter {
   private final String onIdleCallbackPath;
   private final long callTimeout;
   private final ExecutorService callTimeoutPool;
+  private final int maxConsecUnderReplRotations = 30; // make this config'able?
 
   private Clock clock = new SystemClock();
 
@@ -137,6 +140,7 @@ class BucketWriter {
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
     isOpen = false;
+    isUnderReplicated = false;
     this.writer.configure(context);
   }
 
@@ -189,6 +193,9 @@ class BucketWriter {
         return null;
       }
     });
+
+    // ensure new files reset under-rep rotate count
+    consecutiveUnderReplRotateCount = 0;
   }
 
   /**
@@ -417,8 +424,29 @@ class BucketWriter {
 
     // check if it's time to rotate the file
     if (shouldRotate()) {
-      close();
-      open();
+      boolean doRotate = true;
+
+      if (isUnderReplicated) {
+        if (maxConsecUnderReplRotations > 0 &&
+            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
+          doRotate = false;
+          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
+            LOG.error("Hit max consecutive under-replication rotations ({}); " +
+                "will not continue rolling files under this path due to " +
+                "under-replication", maxConsecUnderReplRotations);
+          }
+        } else {
+          LOG.warn("Block Under-replication detected. Rotating file.");
+        }
+        consecutiveUnderReplRotateCount++;
+      } else {
+        consecutiveUnderReplRotateCount = 0;
+      }
+
+      if (doRotate) {
+        close();
+        open();
+      }
     }
 
     // write the event
@@ -460,6 +488,13 @@ class BucketWriter {
   private boolean shouldRotate() {
     boolean doRotate = false;
 
+    if (writer.isUnderReplicated()) {
+      this.isUnderReplicated = true;
+      doRotate = true;
+    } else {
+      this.isUnderReplicated = false;
+    }
+
     if ((rollCount > 0) && (rollCount <= eventCounter)) {
       LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
       doRotate = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 1f3521e..0c618b5 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HDFSCompressedDataStream implements HDFSWriter {
+public class HDFSCompressedDataStream extends AbstractHDFSWriter {
 
   private static final Logger logger =
       LoggerFactory.getLogger(HDFSCompressedDataStream.class);
@@ -51,6 +51,8 @@ public class HDFSCompressedDataStream implements HDFSWriter {
 
   @Override
   public void configure(Context context) {
+    super.configure(context);
+
     serializerType = context.getString("serializer", "TEXT");
     useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
         false);
@@ -99,6 +101,9 @@ public class HDFSCompressedDataStream implements HDFSWriter {
       throw new IOException("serializer (" + serializerType
           + ") does not support append");
     }
+
+    registerCurrentStream(fsOut, hdfs);
+
     if (appending) {
       serializer.afterReopen();
     } else {
@@ -143,6 +148,8 @@ public class HDFSCompressedDataStream implements HDFSWriter {
     fsOut.flush();
     fsOut.sync();
     cmpOut.close();
+
+    unregisterCurrentStream();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index 4ea4327..c87fafe 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HDFSDataStream implements HDFSWriter {
+public class HDFSDataStream extends AbstractHDFSWriter {
 
   private static final Logger logger =
       LoggerFactory.getLogger(HDFSDataStream.class);
@@ -46,6 +46,8 @@ public class HDFSDataStream implements HDFSWriter {
 
   @Override
   public void configure(Context context) {
+    super.configure(context);
+
     serializerType = context.getString("serializer", "TEXT");
     useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
         false);
@@ -87,6 +89,9 @@ public class HDFSDataStream implements HDFSWriter {
           ") does not support append");
     }
 
+    // must call superclass to check for replication issues
+    registerCurrentStream(outStream, hdfs);
+
     if (appending) {
       serializer.afterReopen();
     } else {
@@ -102,7 +107,6 @@ public class HDFSDataStream implements HDFSWriter {
 
   @Override
   public void append(Event e) throws IOException {
-    // shun flumeformatter...
     serializer.write(e);
   }
 
@@ -120,6 +124,8 @@ public class HDFSDataStream implements HDFSWriter {
     outStream.flush();
     outStream.sync();
     outStream.close();
+
+    unregisterCurrentStream();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 3bd25f4..1a401d6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HDFSSequenceFile implements HDFSWriter {
+public class HDFSSequenceFile extends AbstractHDFSWriter {
 
   private static final Logger logger =
       LoggerFactory.getLogger(HDFSSequenceFile.class);
@@ -41,6 +41,7 @@ public class HDFSSequenceFile implements HDFSWriter {
   private Context serializerContext;
   private SequenceFileSerializer serializer;
   private boolean useRawLocalFileSystem;
+  private FSDataOutputStream outStream = null;
 
   public HDFSSequenceFile() {
     writer = null;
@@ -48,6 +49,8 @@ public class HDFSSequenceFile implements HDFSWriter {
 
   @Override
   public void configure(Context context) {
+    super.configure(context);
+
     // use binary writable serialize by default
     writeFormat = context.getString("hdfs.writeFormat",
       SequenceFileSerializerType.Writable.name());
@@ -82,14 +85,14 @@ public class HDFSSequenceFile implements HDFSWriter {
     }
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
             (dstPath)) {
-      FSDataOutputStream outStream = hdfs.append(dstPath);
-      writer = SequenceFile.createWriter(conf, outStream, serializer
-        .getKeyClass(),
-        serializer.getValueClass(), compType, codeC);
+      outStream = hdfs.append(dstPath);
     } else {
-      writer = SequenceFile.createWriter(hdfs, conf, dstPath,
-        serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);
+      outStream = hdfs.create(dstPath);
     }
+    writer = SequenceFile.createWriter(conf, outStream,
+        serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);
+
+    registerCurrentStream(outStream, hdfs);
   }
 
   @Override
@@ -107,5 +110,8 @@ public class HDFSSequenceFile implements HDFSWriter {
   @Override
   public void close() throws IOException {
     writer.close();
+    outStream.close();
+
+    unregisterCurrentStream();
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
index abca21f..44a984a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
@@ -21,10 +21,14 @@ package org.apache.flume.sink.hdfs;
 import java.io.IOException;
 
 import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.conf.Configurable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public interface HDFSWriter extends Configurable {
 
   public void open(String filePath) throws IOException;
@@ -38,4 +42,6 @@ public interface HDFSWriter extends Configurable {
 
   public void close() throws IOException;
 
+  public boolean isUnderReplicated();
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
index 5e8628b..ec49b97 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
@@ -86,4 +86,9 @@ public class MockHDFSWriter implements HDFSWriter {
     filesClosed++;
   }
 
+  @Override
+  public boolean isUnderReplicated() {
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 99b6150..99e787e 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -190,6 +190,11 @@ public class TestBucketWriter {
         open = false;
       }
 
+      @Override
+      public boolean isUnderReplicated() {
+        return false;
+      }
+
       public void append(Event e) throws IOException {
         // we just re-open in append if closed
         open = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/17338bf3/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
index 2e71069..c2b96f7 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
@@ -35,9 +35,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,14 +55,13 @@ public class TestHDFSEventSinkOnMiniCluster {
 
   private static final boolean KEEP_DATA = false;
   private static final String DFS_DIR = "target/test/dfs";
-  private static final Configuration CONF = new Configuration();
   private static final String TEST_BUILD_DATA_KEY = "test.build.data";
 
   private static MiniDFSCluster cluster = null;
   private static String oldTestBuildDataProp = null;
 
   @BeforeClass
-  public static void setup() throws IOException {
+  public static void setupClass() throws IOException {
     // set up data dir for HDFS
     File dfsDir = new File(DFS_DIR);
     if (!dfsDir.isDirectory()) {
@@ -69,9 +70,6 @@ public class TestHDFSEventSinkOnMiniCluster {
     // save off system prop to restore later
     oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY);
     System.setProperty(TEST_BUILD_DATA_KEY, DFS_DIR);
-
-    cluster = new MiniDFSCluster(CONF, 1, true, null);
-    cluster.waitActive();
   }
 
   private static String getNameNodeURL(MiniDFSCluster cluster) {
@@ -84,6 +82,9 @@ public class TestHDFSEventSinkOnMiniCluster {
    */
   @Test
   public void simpleHDFSTest() throws EventDeliveryException, IOException {
+    cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    cluster.waitActive();
+
     String outputDir = "/flume/simpleHDFSTest";
     Path outputDirPath = new Path(outputDir);
 
@@ -150,6 +151,9 @@ public class TestHDFSEventSinkOnMiniCluster {
     if (!KEEP_DATA) {
       fs.delete(outputDirPath, true);
     }
+
+    cluster.shutdown();
+    cluster = null;
   }
 
   /**
@@ -157,6 +161,9 @@ public class TestHDFSEventSinkOnMiniCluster {
    */
   @Test
   public void simpleHDFSGZipCompressedTest() throws EventDeliveryException, IOException {
+    cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    cluster.waitActive();
+
     String outputDir = "/flume/simpleHDFSGZipCompressedTest";
     Path outputDirPath = new Path(outputDir);
 
@@ -241,13 +248,125 @@ public class TestHDFSEventSinkOnMiniCluster {
     if (!KEEP_DATA) {
       fs.delete(outputDirPath, true);
     }
+
+    cluster.shutdown();
+    cluster = null;
   }
 
-  @AfterClass
-  public static void teardown() {
+  /**
+   * This is a very basic test that writes one event to HDFS and reads it back.
+   */
+  @Test
+  public void underReplicationTest() throws EventDeliveryException,
+      IOException {
+    Configuration conf = new Configuration();
+    conf.set("dfs.replication", String.valueOf(3));
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+
+    String outputDir = "/flume/underReplicationTest";
+    Path outputDirPath = new Path(outputDir);
+
+    logger.info("Running test with output dir: {}", outputDir);
+
+    FileSystem fs = cluster.getFileSystem();
+    // ensure output directory is empty
+    if (fs.exists(outputDirPath)) {
+      fs.delete(outputDirPath, true);
+    }
+
+    String nnURL = getNameNodeURL(cluster);
+    logger.info("Namenode address: {}", nnURL);
+
+    Context chanCtx = new Context();
+    MemoryChannel channel = new MemoryChannel();
+    channel.setName("simpleHDFSTest-mem-chan");
+    channel.configure(chanCtx);
+    channel.start();
+
+    Context sinkCtx = new Context();
+    sinkCtx.put("hdfs.path", nnURL + outputDir);
+    sinkCtx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType);
+    sinkCtx.put("hdfs.batchSize", Integer.toString(1));
+
+    HDFSEventSink sink = new HDFSEventSink();
+    sink.setName("simpleHDFSTest-hdfs-sink");
+    sink.configure(sinkCtx);
+    sink.setChannel(channel);
+    sink.start();
+
+    // create an event
+    channel.getTransaction().begin();
+    try {
+      channel.put(EventBuilder.withBody("yarg 1", Charsets.UTF_8));
+      channel.put(EventBuilder.withBody("yarg 2", Charsets.UTF_8));
+      channel.put(EventBuilder.withBody("yarg 3", Charsets.UTF_8));
+      channel.put(EventBuilder.withBody("yarg 4", Charsets.UTF_8));
+      channel.put(EventBuilder.withBody("yarg 5", Charsets.UTF_8));
+      channel.put(EventBuilder.withBody("yarg 5", Charsets.UTF_8));
+      channel.getTransaction().commit();
+    } finally {
+      channel.getTransaction().close();
+    }
+
+    // store events to HDFS
+    logger.info("Running process(). Create new file.");
+    sink.process(); // create new file;
+    logger.info("Running process(). Same file.");
+    sink.process();
+
+    // kill a datanode
+    logger.info("Killing datanode #1...");
+    cluster.stopDataNode(0);
+
+    // there is a race here.. the client may or may not notice that the
+    // datanode is dead before it next sync()s.
+    // so, this next call may or may not roll a new file.
+
+    logger.info("Running process(). Create new file? (racy)");
+    sink.process();
+
+    logger.info("Running process(). Create new file.");
+    sink.process();
+
+    logger.info("Running process(). Create new file.");
+    sink.process();
+
+    logger.info("Running process(). Create new file.");
+    sink.process();
+
+    // shut down flume
+    sink.stop();
+    channel.stop();
+
+    // verify that it's in HDFS and that its content is what we say it should be
+    FileStatus[] statuses = fs.listStatus(outputDirPath);
+    Assert.assertNotNull("No files found written to HDFS", statuses);
+
+    for (FileStatus status : statuses) {
+      Path filePath = status.getPath();
+      logger.info("Found file on DFS: {}", filePath);
+      FSDataInputStream stream = fs.open(filePath);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+      String line = reader.readLine();
+      logger.info("First line in file {}: {}", filePath, line);
+      Assert.assertTrue(line.startsWith("yarg"));
+    }
+
+    Assert.assertTrue("4 or 5 files expected",
+        statuses.length == 4 || statuses.length == 5);
+    System.out.println("There are " + statuses.length + " files.");
+
+    if (!KEEP_DATA) {
+      fs.delete(outputDirPath, true);
+    }
+
     cluster.shutdown();
     cluster = null;
+  }
 
+  @AfterClass
+  public static void teardownClass() {
     // restore system state, if needed
     if (oldTestBuildDataProp != null) {
       System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp);