You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/12 21:35:58 UTC
svn commit: r1182552 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/channel/
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/...
Author: arvind
Date: Wed Oct 12 19:35:57 2011
New Revision: 1182552
URL: http://svn.apache.org/viewvc?rev=1182552&view=rev
Log:
FLUME-788. Add more test cases for Flume NG HDFS sink.
(Prasad Mujumdar via Arvind Prabhakar)
Added:
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java Wed Oct 12 19:35:57 2011
@@ -226,7 +226,8 @@ public class MultiOpMemChannel implement
StampedEvent undoEvent;
StampedEvent currentEvent;
- while ((undoEvent = myTxn.getUndoPutList().removeLast()) != null) {
+ while ((myTxn.getUndoPutList().isEmpty()) == false) {
+ undoEvent = myTxn.getUndoPutList().removeLast();
currentEvent = queue.removeLast();
Preconditions.checkNotNull(currentEvent, "Rollback error");
Preconditions.checkArgument(currentEvent == undoEvent ,
@@ -267,8 +268,9 @@ public class MultiOpMemChannel implement
*/
protected void undoTake(MemTransaction myTxn) {
StampedEvent e;
-
- while ((e = myTxn.getUndoTakeList().removeLast()) != null) {
+
+ while (myTxn.getUndoTakeList().isEmpty() == false) {
+ e = myTxn.getUndoTakeList().removeLast();
queue.addFirst(e);
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Wed Oct 12 19:35:57 2011
@@ -105,9 +105,11 @@ public class BucketWriter {
}
// close the file, ignore the IOException
+ // ideally the underlying writer should discard unwritten data
public void abort() {
try {
close();
+ open();
} catch (IOException eIO) {
// Ignore it
}
@@ -167,11 +169,11 @@ public class BucketWriter {
if ((rollInterval > 0)
&& (rollInterval < (System.currentTimeMillis() - lastProcessTime) / 1000))
doRotate = true;
- if ((rollCount > 0) && (rollCount < eventCounter)) {
+ if ((rollCount > 0) && (rollCount <= eventCounter)) {
eventCounter = 0;
doRotate = true;
}
- if ((rollSize > 0) && (rollSize < processSize)) {
+ if ((rollSize > 0) && (rollSize <= processSize)) {
processSize = 0;
doRotate = true;
}
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Oct 12 19:35:57 2011
@@ -71,6 +71,7 @@ public class HDFSEventSink extends Abstr
private String path;
private int maxOpenFiles;
private String writeFormat;
+ private HDFSWriterFactory myWriterFactory;
/*
* Extended Java LinkedHashMap for open file handle LRU queue We want to clear
@@ -106,7 +107,11 @@ public class HDFSEventSink extends Abstr
// private boolean shouldSub = false;
public HDFSEventSink() {
-
+ myWriterFactory = new HDFSWriterFactory();
+ }
+
+ public HDFSEventSink(HDFSWriterFactory newWriterFactory) {
+ myWriterFactory = newWriterFactory;
}
// read configuration and setup thresholds
@@ -267,7 +272,7 @@ public class HDFSEventSink extends Abstr
// we haven't seen this file yet, so open it and cache the handle
if (bw == null) {
- HDFSWriter writer = HDFSWriterFactory.getWriter(fileType);
+ HDFSWriter writer = myWriterFactory.getWriter(fileType);
FlumeFormatter formatter = HDFSFormatterFactory
.getFormatter(writeFormat);
bw = new BucketWriter(rollInterval, rollSize, rollCount, batchSize);
@@ -296,7 +301,7 @@ public class HDFSEventSink extends Abstr
return Status.READY;
} catch (IOException eIO) {
transaction.rollback();
- LOG.error("HDFS IO error", eIO);
+ LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Exception e) {
transaction.rollback();
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java Wed Oct 12 19:35:57 2011
@@ -47,7 +47,9 @@ public class HDFSTextFormatter implement
@Override
public byte[] getBytes(Event e) {
- return makeText(e).getBytes();
+ Text record = makeText(e);
+ record.append("\n".getBytes(), 0, 1);
+ return record.getBytes();
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java Wed Oct 12 19:35:57 2011
@@ -20,7 +20,7 @@ package org.apache.flume.sink.hdfs;
import java.io.IOException;
-abstract class HDFSWriterFactory {
+public class HDFSWriterFactory {
static final String SequenceFileType = "SequenceFile";
static final String DataStreamType = "DataStream";
static final String CompStreamType = "CompressedStream";
@@ -29,7 +29,7 @@ abstract class HDFSWriterFactory {
}
- public static HDFSWriter getWriter(String fileType) throws IOException {
+ public HDFSWriter getWriter(String fileType) throws IOException {
if (fileType == SequenceFileType) {
return new HDFSSequenceFile();
} else if (fileType == DataStreamType) {
Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java?rev=1182552&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java Wed Oct 12 19:35:57 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.flume.sink.hdfs.HDFSSequenceFile;
+
+
+public class HDFSBadDataStream extends HDFSDataStream {
+ public class HDFSBadSeqWriter extends HDFSSequenceFile {
+ @Override
+ public void append(Event e, FlumeFormatter fmt) throws IOException {
+
+ if (e.getHeaders().containsKey("fault")) {
+ throw new IOException("Injected fault");
+ }
+ super.append(e, fmt);
+ }
+
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java?rev=1182552&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java Wed Oct 12 19:35:57 2011
@@ -0,0 +1,22 @@
+package org.apache.flume.sink.hdfs;
+
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+
+public class HDFSBadSeqWriter extends HDFSSequenceFile {
+ @Override
+ public void append(Event e, FlumeFormatter fmt) throws IOException {
+
+ if (e.getHeaders().containsKey("fault")) {
+ throw new IOException("Injected fault");
+ } else if (e.getHeaders().containsKey("fault-once")) {
+ e.getHeaders().remove("fault-once");
+ throw new IOException("Injected fault");
+ }
+ super.append(e, fmt);
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java?rev=1182552&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java Wed Oct 12 19:35:57 2011
@@ -0,0 +1,22 @@
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.sink.hdfs.HDFSBadSeqWriter;
+import org.apache.flume.sink.hdfs.HDFSBadDataStream;
+
+public class HDFSBadWriterFactory extends HDFSWriterFactory {
+ static final String BadSequenceFileType = "SequenceFile";
+ static final String BadDataStreamType = "DataStream";
+ static final String BadCompStreamType = "CompressedStream";
+
+ public HDFSWriter getWriter(String fileType) throws IOException {
+ if (fileType == BadSequenceFileType) {
+ return new HDFSBadSeqWriter();
+ } else if (fileType == BadDataStreamType) {
+ return new HDFSBadDataStream();
+ } else {
+ throw new IOException("File type " + fileType + " not supported");
+ }
+ }
+}
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1182552&r1=1182551&r2=1182552&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Wed Oct 12 19:35:57 2011
@@ -17,31 +17,60 @@
*/
package org.apache.flume.sink.hdfs;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.Calendar;
+import junit.framework.Assert;
+
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.MultiOpMemChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.sink.hdfs.HDFSEventSink;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.flume.sink.hdfs.HDFSBadWriterFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import junit.framework.TestCase;
+import junit.framework.TestResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class TestHDFSEventSink {
+public class TestHDFSEventSink extends TestCase {
private HDFSEventSink sink;
private String testPath;
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
+
+ private void dirCleanup() {
+ Configuration conf = new Configuration();
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(testPath);
+ fs.delete(dirPath, true);
+ } catch (IOException eIO) {
+ LOG.warn("IO Error in test cleanup", eIO);
+ }
+ }
+ // TODO: use System.getProperty("file.separator") instead of hardcoded '/'
@Before
public void setUp() {
/*
@@ -50,12 +79,17 @@ public class TestHDFSEventSink {
* Hadoop config points at file:/// rather than hdfs://. We need to find a
* better way of testing HDFS related functionality.
*/
- testPath = "/user/flume/testdata";
+ testPath = "file:///tmp/fluem-test." + Calendar.getInstance().getTimeInMillis()
+ + "." + Thread.currentThread().getId();
+
sink = new HDFSEventSink();
+ dirCleanup();
}
@After
public void tearDown() {
+ if( System.getenv("hdfs_keepFiles") == null)
+ dirCleanup();
}
@Test
@@ -75,28 +109,34 @@ public class TestHDFSEventSink {
}
@Test
- public void testAppend() throws InterruptedException, LifecycleException,
+ public void testTextAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
final long txnMax = 25;
final long rollCount = 3;
final long batchSize = 2;
final String fileName = "FlumeData";
+ String newPath = testPath + "/singleTextBucket";
+ int totalEvents = 0;
+ int i=1,j=1;
// clear the test directory
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
- Path dirPath = new Path(testPath);
+ Path dirPath = new Path(newPath);
fs.delete(dirPath, true);
fs.mkdirs(dirPath);
Context context = new Context();
- context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+ context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
+ context.put("hdfs.writeFormat","Text");
+ context.put("hdfs.fileType", "DataStream");
Configurables.configure(sink, context);
@@ -111,9 +151,9 @@ public class TestHDFSEventSink {
Calendar eventDate = Calendar.getInstance();
// push the event batches into channel
- for (int i = 1; i < 4; i++) {
+ for (i = 1; i < 4; i++) {
txn.begin();
- for (int j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= txnMax; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -123,6 +163,7 @@ public class TestHDFSEventSink {
event.setBody(("Test." + i + "." + j).getBytes());
channel.put(event);
+ totalEvents++;
}
txn.commit();
@@ -132,25 +173,138 @@ public class TestHDFSEventSink {
sink.stop();
- /*
- *
- * // loop through all the files generated and check their contains
- * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
- * FileUtil.stat2Paths(dirStat);
- *
- * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
- * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
- * = new LongWritable(); BytesWritable value = new BytesWritable();
- *
- * while (reader.next(key, value)) { logger.info(key+ ":" +
- * value.toString()); } reader.close(); } } catch (IOException ioe) {
- * System.err.println("IOException during operation: " + ioe.toString());
- * System.exit(1); }
- */
+ // loop through all the files generated and check their contains
+ FileStatus[] dirStat = fs.listStatus(dirPath);
+ Path fList[] = FileUtil.stat2Paths(dirStat);
+
+ // check that the roll happened correctly for the given data
+ // Note that we'll end up with one last file with only header
+ Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+
+ try {
+ i = j = 1;
+ for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+ Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+ FSDataInputStream input = fs.open(filePath);
+ BufferedReader d = new BufferedReader(new InputStreamReader(input));
+ String line;
+
+ while ((line = d.readLine()) != null) {
+ Assert.assertEquals(line, ("Test." + i + "." + j));
+ if ( ++j > txnMax) {
+ j = 1;
+ i++;
+ }
+ }
+ input.close();
+ }
+ } catch (IOException ioe) {
+ System.err.println("IOException during operation: " + ioe.toString());
+ return;
+ }
+ Assert.assertEquals(i, 4);
}
@Test
- public void testTextAppend() throws InterruptedException, LifecycleException,
+ public void testSimpleAppend() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+
+ final long txnMax = 25;
+ final String fileName = "FlumeData";
+ final long rollCount = 5;
+ final long batchSize = 2;
+ final int numBatches = 4;
+ String newPath = testPath + "/singleBucket";
+ int totalEvents = 0;
+ int i=1,j=1;
+
+ // clear the test directory
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(newPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ Context context = new Context();
+
+ context.put("hdfs.path", newPath);
+ context.put("hdfs.filePrefix", fileName);
+ context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+ context.put("hdfs.rollCount", String.valueOf(rollCount));
+ context.put("hdfs.batchSize", String.valueOf(batchSize));
+
+ Configurables.configure(sink, context);
+
+ Channel channel = new MultiOpMemChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+
+ Calendar eventDate = Calendar.getInstance();
+
+ // push the event batches into channel
+ for (i = 1; i < numBatches; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (j = 1; j <= txnMax; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+
+ event.setBody(("Test." + i + "." + j).getBytes());
+ channel.put(event);
+ totalEvents ++;
+ }
+ txn.commit();
+ txn.close();
+
+ // execute sink to process the events
+ sink.process();
+ }
+
+ sink.stop();
+
+
+ // loop through all the files generated and check their contains
+ FileStatus[] dirStat = fs.listStatus(dirPath);
+ Path fList[] = FileUtil.stat2Paths(dirStat);
+
+ // check that the roll happened correctly for the given data
+ // Note that we'll end up with one last file with only header
+ Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+
+ try {
+ i = j = 1;
+ for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+ Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+ BytesWritable expValue;
+
+ while (reader.next(key, value)) {
+ expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
+ Assert.assertEquals(expValue, value);
+ if ( ++j > txnMax) {
+ j = 1;
+ i++;
+ }
+ }
+ reader.close();
+ }
+ } catch (IOException ioe) {
+ System.err.println("IOException during operation: " + ioe.toString());
+ System.exit(1);
+ }
+ Assert.assertEquals(i, 4);
+
+ }
+
+ @Test
+ public void testAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
final long txnMax = 25;
@@ -172,8 +326,6 @@ public class TestHDFSEventSink {
context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
- context.put("hdfs.writeFormat","Text");
- context.put("hdfs.fileType", "DataStream");
Configurables.configure(sink, context);
@@ -226,5 +378,80 @@ public class TestHDFSEventSink {
*/
}
-
+
+ // inject fault and make sure that the txn is rolled back and retried
+ @Test
+ public void testBadSimpleAppend() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+
+ final long txnMax = 25;
+ final String fileName = "FlumeData";
+ final long rollCount = 5;
+ final long batchSize = 2;
+ final int numBatches = 4;
+ String newPath = testPath + "/singleBucket";
+ int totalEvents = 0;
+ int i=1,j=1;
+
+ HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+ sink = new HDFSEventSink(badWriterFactory);
+
+ // clear the test directory
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(newPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ Context context = new Context();
+
+ context.put("hdfs.path", newPath);
+ context.put("hdfs.filePrefix", fileName);
+ context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+ context.put("hdfs.rollCount", String.valueOf(rollCount));
+ context.put("hdfs.batchSize", String.valueOf(batchSize));
+ context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+
+ Configurables.configure(sink, context);
+
+ Channel channel = new MultiOpMemChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+
+ Calendar eventDate = Calendar.getInstance();
+
+ // push the event batches into channel
+ for (i = 1; i < numBatches; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (j = 1; j <= txnMax; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+
+ event.setBody(("Test." + i + "." + j).getBytes());
+ // inject fault
+ if ((totalEvents % 30) == 1) {
+ event.getHeaders().put("fault-once", "");
+ }
+ channel.put(event);
+ totalEvents ++;
+ }
+ txn.commit();
+ txn.close();
+
+ // execute sink to process the events
+ sink.process();
+ }
+ LOG.info("clear any events pending due to errors");
+ // clear any events pending due to errors
+ sink.process();
+
+ sink.stop();
+ }
+
}