You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/03/09 04:02:28 UTC

git commit: FLUME-1937. Issue with maxUnderReplication in HDFS sink.

Updated Branches:
  refs/heads/trunk e72e559ba -> b28b87b58


FLUME-1937. Issue with maxUnderReplication in HDFS sink.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b28b87b5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b28b87b5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b28b87b5

Branch: refs/heads/trunk
Commit: b28b87b58b2b08d31cac055cb3f9f8762f65b469
Parents: e72e559
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Mar 8 19:00:29 2013 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Mar 8 19:00:29 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |    8 +-
 .../sink/hdfs/TestHDFSEventSinkOnMiniCluster.java  |  107 ++++++++++++++-
 2 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b28b87b5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index c11fb20..774f297 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -92,7 +92,7 @@ class BucketWriter {
   private volatile long batchCounter;
   private volatile boolean isOpen;
   private volatile boolean isUnderReplicated;
-  private volatile int consecutiveUnderReplRotateCount;
+  private volatile int consecutiveUnderReplRotateCount = 0;
   private volatile ScheduledFuture<Void> timedRollFuture;
   private SinkCounter sinkCounter;
   private final int idleTimeout;
@@ -193,9 +193,6 @@ class BucketWriter {
         return null;
       }
     });
-
-    // ensure new files reset under-rep rotate count
-    consecutiveUnderReplRotateCount = 0;
   }
 
   /**
@@ -576,7 +573,8 @@ class BucketWriter {
     } catch (TimeoutException eT) {
       future.cancel(true);
       sinkCounter.incrementConnectionFailedCount();
-      throw new IOException("Callable timed out after " + callTimeout + " ms",
+      throw new IOException("Callable timed out after " + callTimeout + " ms" +
+          " on file: " + bucketPath,
         eT);
     } catch (ExecutionException e1) {
       sinkCounter.incrementConnectionFailedCount();

http://git-wip-us.apache.org/repos/asf/flume/blob/b28b87b5/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
index c2b96f7..6e11624 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
@@ -353,7 +353,7 @@ public class TestHDFSEventSinkOnMiniCluster {
       Assert.assertTrue(line.startsWith("yarg"));
     }
 
-    Assert.assertTrue("4 or 5 files expected",
+    Assert.assertTrue("4 or 5 files expected, found " + statuses.length,
         statuses.length == 4 || statuses.length == 5);
     System.out.println("There are " + statuses.length + " files.");
 
@@ -365,6 +365,111 @@ public class TestHDFSEventSinkOnMiniCluster {
     cluster = null;
   }
 
+  /**
+   * This is a very basic test that writes one event to HDFS and reads it back.
+   */
+  @Test
+  public void maxUnderReplicationTest() throws EventDeliveryException,
+      IOException {
+    Configuration conf = new Configuration();
+    conf.set("dfs.replication", String.valueOf(3));
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+
+    String outputDir = "/flume/underReplicationTest";
+    Path outputDirPath = new Path(outputDir);
+
+    logger.info("Running test with output dir: {}", outputDir);
+
+    FileSystem fs = cluster.getFileSystem();
+    // ensure output directory is empty
+    if (fs.exists(outputDirPath)) {
+      fs.delete(outputDirPath, true);
+    }
+
+    String nnURL = getNameNodeURL(cluster);
+    logger.info("Namenode address: {}", nnURL);
+
+    Context chanCtx = new Context();
+    MemoryChannel channel = new MemoryChannel();
+    channel.setName("simpleHDFSTest-mem-chan");
+    channel.configure(chanCtx);
+    channel.start();
+
+    Context sinkCtx = new Context();
+    sinkCtx.put("hdfs.path", nnURL + outputDir);
+    sinkCtx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType);
+    sinkCtx.put("hdfs.batchSize", Integer.toString(1));
+
+    HDFSEventSink sink = new HDFSEventSink();
+    sink.setName("simpleHDFSTest-hdfs-sink");
+    sink.configure(sinkCtx);
+    sink.setChannel(channel);
+    sink.start();
+
+    // create an event
+    channel.getTransaction().begin();
+    try {
+      for (int i = 0; i < 50; i++) {
+        channel.put(EventBuilder.withBody("yarg " + i, Charsets.UTF_8));
+      }
+      channel.getTransaction().commit();
+    } finally {
+      channel.getTransaction().close();
+    }
+
+    // store events to HDFS
+    logger.info("Running process(). Create new file.");
+    sink.process(); // create new file;
+    logger.info("Running process(). Same file.");
+    sink.process();
+
+    // kill a datanode
+    logger.info("Killing datanode #1...");
+    cluster.stopDataNode(0);
+
+    // there is a race here.. the client may or may not notice that the
+    // datanode is dead before it next sync()s.
+    // so, this next call may or may not roll a new file.
+
+    logger.info("Running process(). Create new file? (racy)");
+    sink.process();
+
+    for (int i = 3; i < 50; i++) {
+      logger.info("Running process().");
+      sink.process();
+    }
+
+    // shut down flume
+    sink.stop();
+    channel.stop();
+
+    // verify that it's in HDFS and that its content is what we say it should be
+    FileStatus[] statuses = fs.listStatus(outputDirPath);
+    Assert.assertNotNull("No files found written to HDFS", statuses);
+
+    for (FileStatus status : statuses) {
+      Path filePath = status.getPath();
+      logger.info("Found file on DFS: {}", filePath);
+      FSDataInputStream stream = fs.open(filePath);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+      String line = reader.readLine();
+      logger.info("First line in file {}: {}", filePath, line);
+      Assert.assertTrue(line.startsWith("yarg"));
+    }
+
+    System.out.println("There are " + statuses.length + " files.");
+    Assert.assertEquals("31 files expected, found " + statuses.length,
+        31, statuses.length);
+
+    if (!KEEP_DATA) {
+      fs.delete(outputDirPath, true);
+    }
+
+    cluster.shutdown();
+    cluster = null;
+  }
+
   @AfterClass
   public static void teardownClass() {
     // restore system state, if needed