You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/03/09 04:02:28 UTC
git commit: FLUME-1937. Issue with maxUnderReplication in HDFS sink.
Updated Branches:
refs/heads/trunk e72e559ba -> b28b87b58
FLUME-1937. Issue with maxUnderReplication in HDFS sink.
(Mike Percy via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b28b87b5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b28b87b5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b28b87b5
Branch: refs/heads/trunk
Commit: b28b87b58b2b08d31cac055cb3f9f8762f65b469
Parents: e72e559
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Mar 8 19:00:29 2013 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Mar 8 19:00:29 2013 -0800
----------------------------------------------------------------------
.../org/apache/flume/sink/hdfs/BucketWriter.java | 8 +-
.../sink/hdfs/TestHDFSEventSinkOnMiniCluster.java | 107 ++++++++++++++-
2 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/b28b87b5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index c11fb20..774f297 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -92,7 +92,7 @@ class BucketWriter {
private volatile long batchCounter;
private volatile boolean isOpen;
private volatile boolean isUnderReplicated;
- private volatile int consecutiveUnderReplRotateCount;
+ private volatile int consecutiveUnderReplRotateCount = 0;
private volatile ScheduledFuture<Void> timedRollFuture;
private SinkCounter sinkCounter;
private final int idleTimeout;
@@ -193,9 +193,6 @@ class BucketWriter {
return null;
}
});
-
- // ensure new files reset under-rep rotate count
- consecutiveUnderReplRotateCount = 0;
}
/**
@@ -576,7 +573,8 @@ class BucketWriter {
} catch (TimeoutException eT) {
future.cancel(true);
sinkCounter.incrementConnectionFailedCount();
- throw new IOException("Callable timed out after " + callTimeout + " ms",
+ throw new IOException("Callable timed out after " + callTimeout + " ms" +
+ " on file: " + bucketPath,
eT);
} catch (ExecutionException e1) {
sinkCounter.incrementConnectionFailedCount();
http://git-wip-us.apache.org/repos/asf/flume/blob/b28b87b5/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
index c2b96f7..6e11624 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
@@ -353,7 +353,7 @@ public class TestHDFSEventSinkOnMiniCluster {
Assert.assertTrue(line.startsWith("yarg"));
}
- Assert.assertTrue("4 or 5 files expected",
+ Assert.assertTrue("4 or 5 files expected, found " + statuses.length,
statuses.length == 4 || statuses.length == 5);
System.out.println("There are " + statuses.length + " files.");
@@ -365,6 +365,111 @@ public class TestHDFSEventSinkOnMiniCluster {
cluster = null;
}
+ /**
+ * This is a very basic test that writes one event to HDFS and reads it back.
+ */
+ @Test
+ public void maxUnderReplicationTest() throws EventDeliveryException,
+ IOException {
+ Configuration conf = new Configuration();
+ conf.set("dfs.replication", String.valueOf(3));
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ cluster.waitActive();
+
+ String outputDir = "/flume/underReplicationTest";
+ Path outputDirPath = new Path(outputDir);
+
+ logger.info("Running test with output dir: {}", outputDir);
+
+ FileSystem fs = cluster.getFileSystem();
+ // ensure output directory is empty
+ if (fs.exists(outputDirPath)) {
+ fs.delete(outputDirPath, true);
+ }
+
+ String nnURL = getNameNodeURL(cluster);
+ logger.info("Namenode address: {}", nnURL);
+
+ Context chanCtx = new Context();
+ MemoryChannel channel = new MemoryChannel();
+ channel.setName("simpleHDFSTest-mem-chan");
+ channel.configure(chanCtx);
+ channel.start();
+
+ Context sinkCtx = new Context();
+ sinkCtx.put("hdfs.path", nnURL + outputDir);
+ sinkCtx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType);
+ sinkCtx.put("hdfs.batchSize", Integer.toString(1));
+
+ HDFSEventSink sink = new HDFSEventSink();
+ sink.setName("simpleHDFSTest-hdfs-sink");
+ sink.configure(sinkCtx);
+ sink.setChannel(channel);
+ sink.start();
+
+ // create an event
+ channel.getTransaction().begin();
+ try {
+ for (int i = 0; i < 50; i++) {
+ channel.put(EventBuilder.withBody("yarg " + i, Charsets.UTF_8));
+ }
+ channel.getTransaction().commit();
+ } finally {
+ channel.getTransaction().close();
+ }
+
+ // store events to HDFS
+ logger.info("Running process(). Create new file.");
+ sink.process(); // create new file;
+ logger.info("Running process(). Same file.");
+ sink.process();
+
+ // kill a datanode
+ logger.info("Killing datanode #1...");
+ cluster.stopDataNode(0);
+
+ // there is a race here.. the client may or may not notice that the
+ // datanode is dead before it next sync()s.
+ // so, this next call may or may not roll a new file.
+
+ logger.info("Running process(). Create new file? (racy)");
+ sink.process();
+
+ for (int i = 3; i < 50; i++) {
+ logger.info("Running process().");
+ sink.process();
+ }
+
+ // shut down flume
+ sink.stop();
+ channel.stop();
+
+ // verify that it's in HDFS and that its content is what we say it should be
+ FileStatus[] statuses = fs.listStatus(outputDirPath);
+ Assert.assertNotNull("No files found written to HDFS", statuses);
+
+ for (FileStatus status : statuses) {
+ Path filePath = status.getPath();
+ logger.info("Found file on DFS: {}", filePath);
+ FSDataInputStream stream = fs.open(filePath);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+ String line = reader.readLine();
+ logger.info("First line in file {}: {}", filePath, line);
+ Assert.assertTrue(line.startsWith("yarg"));
+ }
+
+ System.out.println("There are " + statuses.length + " files.");
+ Assert.assertEquals("31 files expected, found " + statuses.length,
+ 31, statuses.length);
+
+ if (!KEEP_DATA) {
+ fs.delete(outputDirPath, true);
+ }
+
+ cluster.shutdown();
+ cluster = null;
+ }
+
@AfterClass
public static void teardownClass() {
// restore system state, if needed