You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/05/21 18:25:22 UTC
svn commit: r1341096 - in
/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src:
main/java/org/apache/flume/sink/hdfs/ test/java/org/apache/flume/sink/hdfs/
Author: arvind
Date: Mon May 21 16:25:21 2012
New Revision: 1341096
URL: http://svn.apache.org/viewvc?rev=1341096&view=rev
Log:
FLUME-1219. Race conditions in BucketWriter and HDFSEventSink
(Mike Percy via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Mon May 21 16:25:21 2012
@@ -21,7 +21,6 @@ package org.apache.flume.sink.hdfs;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Context;
-
import org.apache.flume.Event;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.hadoop.conf.Configuration;
@@ -29,11 +28,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BucketWriter {
-
+/**
+ * Internal API intended for HDFSSink use.
+ * This class does file rolling and handles file formats and serialization.
+ * The methods in this class are NOT THREAD SAFE.
+ */
+class BucketWriter {
private static final Logger LOG = LoggerFactory
.getLogger(BucketWriter.class);
@@ -50,16 +54,18 @@ public class BucketWriter {
private long eventCounter;
private long processSize;
private long lastRollTime;
- private long batchCounter;
- private String filePath;
private long rollInterval;
private long rollSize;
private long rollCount;
private long batchSize;
private CompressionCodec codeC;
private CompressionType compType;
- private String bucketPath;
+ private FileSystem fileSystem;
private Context context;
+ private volatile String filePath;
+ private volatile String bucketPath;
+ private volatile long batchCounter;
+ private volatile boolean isOpen;
// clear the class counters
private void resetCounters() {
@@ -69,105 +75,145 @@ public class BucketWriter {
batchCounter = 0;
}
- // constructor. initialize the thresholds and open the file handle
- public BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize,
- Context ctx) throws IOException {
+ BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize,
+ Context ctx, String fPath, CompressionCodec codec, CompressionType cType,
+ HDFSWriter hWriter, FlumeFormatter fmt) {
rollInterval = rollInt;
rollSize = rollSz;
rollCount = rollCnt;
batchSize = bSize;
context = ctx;
+ filePath = fPath;
+ codeC = codec;
+ compType = cType;
+ writer = hWriter;
+ formatter = fmt;
+ isOpen = false;
- resetCounters();
- // open();
+ writer.configure(context);
}
- public void open() throws IOException {
+ /**
+ * open() is called by append()
+ * WARNING: acquires a lock on the logged-in Kerberos user object!
+ * @throws IOException
+ */
+ private void open() throws IOException {
if ((filePath == null) || (writer == null) || (formatter == null)) {
throw new IOException("Invalid file settings");
}
- writer.configure(context);
-
long counter = fileExentionCounter.incrementAndGet();
- if (codeC == null) {
- bucketPath = filePath + "." + counter;
- writer.open(bucketPath + IN_USE_EXT, formatter);
- } else {
- bucketPath = filePath + "." + counter
- + codeC.getDefaultExtension();
- writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
- }
- batchCounter = 0;
- LOG.info("Creating " + bucketPath + IN_USE_EXT);
- }
-
- public void open(String fPath, HDFSWriter hWriter, FlumeFormatter fmt)
- throws IOException {
- open(fPath, null, CompressionType.NONE, hWriter, fmt);
- }
- public void open(String fPath, CompressionCodec codec, CompressionType cType,
- HDFSWriter hWriter, FlumeFormatter fmt) throws IOException {
- filePath = fPath;
- codeC = codec;
- compType = cType;
- writer = hWriter;
- formatter = fmt;
- open();
- }
+ Configuration config = new Configuration();
+ // disable FileSystem JVM shutdown hook
+ config.setBoolean("fs.automatic.close", false);
+
+ // Hadoop is not thread safe when doing certain RPC operations,
+ // including getFileSystem(), when running under Kerberos
+ UserGroupInformation staticLogin = UserGroupInformation.getLoginUser();
+ synchronized (staticLogin) {
+ if (codeC == null) {
+ bucketPath = filePath + "." + counter;
+ // need to get reference to FS before writer does to avoid shutdown hook
+ fileSystem = new Path(bucketPath).getFileSystem(config);
+ LOG.info("Creating " + bucketPath + IN_USE_EXT);
+ writer.open(bucketPath + IN_USE_EXT, formatter);
+ } else {
+ bucketPath = filePath + "." + counter
+ + codeC.getDefaultExtension();
+ // need to get reference to FS before writer does to avoid shutdown hook
+ fileSystem = new Path(bucketPath).getFileSystem(config);
+ LOG.info("Creating " + bucketPath + IN_USE_EXT);
+ writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+ }
+ }
- // close the file handle
- public void close() throws IOException {
- LOG.debug("Closing " + bucketPath);
resetCounters();
- if (writer != null) {
- writer.close(); // could block
- }
- renameBucket();
+ isOpen = true;
}
- // close the file, ignore the IOException
- // ideally the underlying writer should discard unwritten data
- public void abort() {
- try {
- close();
- } catch (IOException ex) {
- LOG.info("Exception during close on abort", ex);
+ /**
+ * Close the file handle and rename the temp file to the permanent filename.
+ * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
+ * @throws IOException On failure to rename if temp file exists.
+ */
+ public synchronized void close() throws IOException {
+ LOG.debug("Closing {}", bucketPath + IN_USE_EXT);
+ if (isOpen) {
+ try {
+ writer.close(); // could block
+ } catch (IOException e) {
+ LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
+ IN_USE_EXT + "). Exception follows.", e);
+ }
+ isOpen = false;
+ } else {
+ LOG.info("HDFSWriter is already closed: {}", bucketPath + IN_USE_EXT);
}
- try {
- open();
- } catch (IOException ex) {
- LOG.warn("Exception during opem on abort", ex);
+ if (bucketPath != null && fileSystem != null) {
+ renameBucket(); // could block or throw IOException
+ fileSystem = null;
}
}
- // flush the data
- public void flush() throws IOException {
+ /**
+ * flush the data
+ */
+ public synchronized void flush() throws IOException {
writer.sync(); // could block
batchCounter = 0;
}
- // append the data, update stats, handle roll and batching
- public void append(Event e) throws IOException {
- writer.append(e, formatter); // could block
+ /**
+ * Open file handles, write data, update stats, handle file rolling and
+ * batching / flushing. <br />
+ * If the write fails, the file is implicitly closed and then the IOException
+ * is rethrown. <br />
+ * We rotate before append, and not after, so that the lastRollTime counter
+ * that is reset by the open() call approximately reflects when the first
+ * event was written to it.
+ */
+ public synchronized void append(Event event) throws IOException {
+ if (!isOpen) {
+ open();
+ }
+
+ // check if it's time to rotate the file
+ if (shouldRotate()) {
+ close();
+ open();
+ }
+
+ // write the event
+ try {
+ writer.append(event, formatter); // could block
+ } catch (IOException e) {
+ LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
+ bucketPath + IN_USE_EXT + ") and rethrowing exception.",
+ e.getMessage());
+ try {
+ close();
+ } catch (IOException e2) {
+ LOG.warn("Caught IOException while closing file (" +
+ bucketPath + IN_USE_EXT + "). Exception follows.", e2);
+ }
+ throw e;
+ }
// update statistics
- processSize += e.getBody().length;
+ processSize += event.getBody().length;
eventCounter++;
batchCounter++;
- // check if its time to rotate the file
- if (shouldRotate()) {
- close();
- open();
- } else if ((batchCounter == batchSize)) {
+ if (batchCounter == batchSize) {
flush();
}
-
}
- // check if time to rotate the file
+ /**
+ * check if time to rotate the file
+ */
private boolean shouldRotate() {
boolean doRotate = false;
@@ -190,19 +236,16 @@ public class BucketWriter {
return doRotate;
}
- public String getFilePath() {
- return filePath;
- }
-
+ /**
+ * Rename bucketPath file from .tmp to permanent location.
+ */
private void renameBucket() throws IOException {
- Configuration conf = new Configuration();
Path srcPath = new Path(bucketPath + IN_USE_EXT);
Path dstPath = new Path(bucketPath);
- FileSystem hdfs = dstPath.getFileSystem(conf);
- if(hdfs.exists(srcPath)) { // could block
+ if(fileSystem.exists(srcPath)) { // could block
LOG.info("Renaming " + srcPath + " to " + dstPath);
- hdfs.rename(srcPath, dstPath); // could block
+ fileSystem.rename(srcPath, dstPath); // could block
}
}
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Mon May 21 16:25:21 2012
@@ -39,7 +39,6 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
@@ -71,16 +70,10 @@ public class HDFSEventSink extends Abstr
private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
private static final int defaultMaxOpenFiles = 5000;
/**
- * Default length of time we wait for an append
- * before closing the file and moving on.
+ * Default length of time we wait for blocking BucketWriter calls
+ * before timing out the operation. Intended to prevent server hangs.
*/
- private static final long defaultAppendTimeout = 1000;
- /**
- * Default length of time we for a non-append
- * before closing the file and moving on. This
- * includes open/close/flush.
- */
- private static final long defaultCallTimeout = 5000;
+ private static final long defaultCallTimeout = 10000;
/**
* Default number of threads available for tasks
* such as append/open/close/flush with hdfs.
@@ -113,7 +106,6 @@ public class HDFSEventSink extends Abstr
private int maxOpenFiles;
private String writeFormat;
private ExecutorService executor;
- private long appendTimeout;
private String kerbConfPrincipal;
private String kerbKeytab;
@@ -213,7 +205,6 @@ public class HDFSEventSink extends Abstr
fileType = context.getString("hdfs.fileType", defaultFileType);
maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
writeFormat = context.getString("hdfs.writeFormat");
- appendTimeout = context.getLong("hdfs.appendTimeout", defaultAppendTimeout);
callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", defaultThreadPoolSize);
kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
@@ -390,15 +381,16 @@ public class HDFSEventSink extends Abstr
* Pull events out of channel and send it to HDFS - take at the most
* txnEventMax, that's the maximum #events to hold in channel for a given
* transaction - find the corresponding bucket for the event, ensure the file
- * is open - extract the pay-load and append to HDFS file
+ * is open - extract the pay-load and append to HDFS file <br />
+ * WARNING: NOT THREAD SAFE
*/
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
+ transaction.begin();
try {
- transaction.begin();
Event event = null;
for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
event = channel.take();
@@ -413,26 +405,15 @@ public class HDFSEventSink extends Abstr
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
- final HDFSWriter writer = writerFactory.getWriter(fileType);
- final FlumeFormatter formatter = HDFSFormatterFactory
+
+ HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
+ FlumeFormatter formatter = HDFSFormatterFactory
.getFormatter(writeFormat);
- bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context);
- final BucketWriter callableWriter = bucketWriter;
- final String callablePath = realPath;
- final CompressionCodec callableCodec = codeC;
- final CompressionType callableCompType = compType;
-
- callWithTimeout(executor, callTimeout,
- new ProxyCallable<Void>(proxyTicket) {
- @Override
- public Void doCall() throws Exception {
- synchronized(callableWriter) {
- callableWriter.open(callablePath, callableCodec,
- callableCompType, writer, formatter);
- }
- return null;
- }
- });
+
+ bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
+ batchSize, context, realPath, codeC, compType, hdfsWriter,
+ formatter);
+
sfWriters.put(realPath, bucketWriter);
}
@@ -444,19 +425,28 @@ public class HDFSEventSink extends Abstr
// Write the data to HDFS
final BucketWriter callableWriter = bucketWriter;
final Event callableEvent = event;
- callWithTimeout(executor, appendTimeout,
+ callWithTimeout(executor, callTimeout,
new ProxyCallable<Void>(proxyTicket) {
@Override
public Void doCall() throws Exception {
- synchronized(callableWriter) {
- try {
- callableWriter.append(callableEvent);
- } catch(IOException ex) {
- callableWriter.abort(); // close/open
- callableWriter.append(callableEvent); // retry
- }
- return null;
- }
+ callableWriter.append(callableEvent);
+ return null;
+ }
+ });
+ }
+
+ // flush all pending buckets before committing the transaction
+ for (BucketWriter writer : writers) {
+ if (writer.isBatchComplete()) {
+ continue;
+ }
+ final BucketWriter callableWriter = writer;
+ callWithTimeout(executor, callTimeout,
+ new ProxyCallable<Void>(proxyTicket) {
+ @Override
+ public Void doCall() throws Exception {
+ callableWriter.flush();
+ return null;
}
});
}
@@ -479,45 +469,28 @@ public class HDFSEventSink extends Abstr
throw new EventDeliveryException(th);
}
} finally {
- // flush the buckets that still has pending data
- // this ensures that the data removed from channel
- // by the current transaction is safely on disk
- for (BucketWriter writer : writers) {
- if (writer.isBatchComplete()) {
- continue;
- }
- final BucketWriter callableWriter = writer;
- callWithTimeoutLogError(executor, callTimeout, "flush on "
- + callableWriter, new ProxyCallable<Void>(proxyTicket) {
- @Override
- public Void doCall() throws Exception {
- synchronized(callableWriter) {
- callableWriter.flush();
- }
- return null;
- }
- });
- }
transaction.close();
}
}
@Override
public void stop() {
- for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
- LOG.info("Closing " + e.getKey());
- final BucketWriter callableWriter = e.getValue();
- callWithTimeoutLogError(executor, callTimeout, "close on " + e.getKey(),
- new ProxyCallable<Void>(proxyTicket) {
+ // do not constrain close() calls with a timeout
+ for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
+ LOG.info("Closing " + entry.getKey());
+ final BucketWriter callableWriter = entry.getValue();
+ callWithTimeoutLogError(executor, callTimeout, "close on " +
+ entry.getKey(), new ProxyCallable<Void>(proxyTicket) {
+
@Override
public Void doCall() throws Exception {
- synchronized(callableWriter) {
- callableWriter.close();
- }
+ callableWriter.close();
return null;
}
});
}
+
+ sfWriters.clear();
executor.shutdown();
try {
while (executor.isTerminated() == false) {
@@ -534,31 +507,6 @@ public class HDFSEventSink extends Abstr
@Override
public void start() {
executor = Executors.newFixedThreadPool(threadsPoolSize);
- // FIXME: if restarted, the code below reopens the writers from the
- // previous "process" executions. Is this what we want? Why
- // not clear this list during close since the appropriate
- // writers will be opened in "process"?
- // TODO: check if this has anything to do with renaming .tmp files.
- // If not, remove this code.
- for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
- final BucketWriter callableWriter = entry.getValue();
- try {
- callWithTimeout(executor, callTimeout,
- new ProxyCallable<Void>(proxyTicket) {
- @Override
- public Void doCall() throws Exception {
- synchronized(callableWriter) {
- callableWriter.open();
- }
- return null;
- }
- });
- } catch (IOException e) {
- throw new FlumeException("Exception opening HDFS file.", e);
- } catch (InterruptedException e) {
- throw new FlumeException("Interrupt while opening HDFS file.", e);
- }
- }
super.start();
}
@@ -624,12 +572,15 @@ public class HDFSEventSink extends Abstr
if (curUser == null || !curUser.getUserName().equals(principal)) {
try {
// static login
- kerberosLogin(principal, kerbKeytab);
+ kerberosLogin(this, principal, kerbKeytab);
} catch (IOException e) {
LOG.error("Authentication or file read error while attempting to "
+ "login as kerberos principal (" + principal + ") using "
+ "keytab (" + kerbKeytab + "). Exception follows.", e);
+ return false;
}
+ } else {
+ LOG.debug("{}: Using existing principal login: {}", this, curUser);
}
// we supposedly got through this unscathed... so store the static user
@@ -706,7 +657,7 @@ public class HDFSEventSink extends Abstr
* @throws IOException if login fails.
*/
private static synchronized UserGroupInformation kerberosLogin(
- String principal, String keytab) throws IOException {
+ HDFSEventSink sink, String principal, String keytab) throws IOException {
// if we are the 2nd user thru the lock, the login should already be
// available statically if login was successful
@@ -719,11 +670,16 @@ public class HDFSEventSink extends Abstr
LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
}
- // this means we really actually have not logged in successfully yet
- if (curUser == null || !curUser.getUserName().equals(principal)) {
+ // we already have logged in successfully
+ if (curUser != null && curUser.getUserName().equals(principal)) {
+ LOG.debug("{}: Using existing principal ({}): {}",
+ new Object[] { sink, principal, curUser });
+
+ // no principal found
+ } else {
- LOG.info("Attempting kerberos login as principal (" + principal + ") " +
- "from keytab file (" + keytab + ")");
+ LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
+ "file ({})", new Object[] { sink, principal, keytab });
// attempt static kerberos login
UserGroupInformation.loginUserFromKeytab(principal, keytab);
@@ -733,4 +689,10 @@ public class HDFSEventSink extends Abstr
return curUser;
}
+ @Override
+ public String toString() {
+ return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
+ " }";
+ }
+
}
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java Mon May 21 16:25:21 2012
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.io.SequenceFile;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,19 +36,14 @@ public class TestBucketWriter {
LoggerFactory.getLogger(TestBucketWriter.class);
private Context ctx = new Context();
- @Before
- public void setup() {
-
- }
-
@Test
public void testEventCountingRoller() throws IOException {
int maxEvents = 100;
- BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextFormatter formatter = new HDFSTextFormatter();
-
- bucketWriter.open("/tmp/file", hdfsWriter, formatter);
+ BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
+ "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+ formatter);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -59,19 +54,19 @@ public class TestBucketWriter {
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
- Assert.assertEquals(hdfsWriter.getEventsWritten(), 1000);
- Assert.assertEquals(hdfsWriter.getBytesWritten(), 3000);
- Assert.assertEquals(hdfsWriter.getFilesOpened(), 11);
+ Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
+ Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
+ Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
@Test
public void testSizeRoller() throws IOException {
int maxBytes = 300;
- BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextFormatter formatter = new HDFSTextFormatter();
-
- bucketWriter.open("/tmp/file", hdfsWriter, formatter);
+ BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
+ "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+ formatter);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
@@ -82,19 +77,19 @@ public class TestBucketWriter {
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
- Assert.assertEquals(hdfsWriter.getEventsWritten(), 1000);
- Assert.assertEquals(hdfsWriter.getBytesWritten(), 3000);
- Assert.assertEquals(hdfsWriter.getFilesOpened(), 11);
+ Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
+ Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
+ Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
@Test
public void testIntervalRoller() throws IOException, InterruptedException {
int rollInterval = 2; // seconds
- BucketWriter bucketWriter = new BucketWriter(rollInterval, 0, 0, 0, ctx);
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
HDFSTextFormatter formatter = new HDFSTextFormatter();
-
- bucketWriter.open("/tmp/file", hdfsWriter, formatter);
+ BucketWriter bucketWriter = new BucketWriter(rollInterval, 0, 0, 0, ctx,
+ "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+ formatter);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
long startNanos = System.nanoTime();
@@ -114,9 +109,10 @@ public class TestBucketWriter {
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
- Assert.assertEquals(hdfsWriter.getEventsWritten(), 1000);
- Assert.assertEquals(hdfsWriter.getBytesWritten(), 3000);
- Assert.assertEquals(hdfsWriter.getFilesOpened(), elapsedSeconds/2 + 1);
+ Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
+ Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
+ Assert.assertEquals("files opened", elapsedSeconds/2 + 1,
+ hdfsWriter.getFilesOpened());
}
}
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1341096&r1=1341095&r2=1341096&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Mon May 21 16:25:21 2012
@@ -17,9 +17,6 @@
*/
package org.apache.flume.sink.hdfs;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@@ -57,8 +54,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,8 +176,7 @@ public class TestHDFSEventSink {
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- // Note that we'll end up with one last file with only header
- Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+ Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
// check the contents of the all files
verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -295,8 +289,7 @@ public class TestHDFSEventSink {
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- // Note that we'll end up with one last file with only header
- Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+ Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -374,8 +367,7 @@ public class TestHDFSEventSink {
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- // Note that we'll end up with one last file with only header
- Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+ Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
verifyOutputAvroFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -450,8 +442,7 @@ public class TestHDFSEventSink {
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- // Note that we'll end up with two files which has have a header
- Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+ Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -623,13 +614,21 @@ public class TestHDFSEventSink {
BytesWritable value = new BytesWritable();
while(reader.next(key, value)) {
String body = new String(value.getBytes(), 0, value.getLength());
- bodies.remove(body);
- found++;
+ if (bodies.contains(body)) {
+ LOG.debug("Found event body: {}", body);
+ bodies.remove(body);
+ found++;
+ }
}
reader.close();
}
}
- assertTrue("Found = " + found + ", Expected = " +
+ if (!bodies.isEmpty()) {
+ for (String body : bodies) {
+ LOG.error("Never found event body: {}", body);
+ }
+ }
+ Assert.assertTrue("Found = " + found + ", Expected = " +
expected + ", Left = " + bodies.size() + " " + bodies,
bodies.size() == 0);
@@ -651,7 +650,7 @@ public class TestHDFSEventSink {
reader.close();
}
}
- assertTrue("Found = " + found + ", Expected = " +
+ Assert.assertTrue("Found = " + found + ", Expected = " +
expected + ", Left = " + bodies.size() + " " + bodies,
bodies.size() == 0);
@@ -681,25 +680,27 @@ public class TestHDFSEventSink {
input.close();
}
}
- assertTrue("Found = " + found + ", Expected = " +
+ Assert.assertTrue("Found = " + found + ", Expected = " +
expected + ", Left = " + bodies.size() + " " + bodies,
bodies.size() == 0);
}
/**
* Ensure that when a write throws an IOException we are
- * able to continue to progress (via close/open).
+ * able to continue to progress in the next process() call.
+ * This relies on Transactional rollback semantics for durability and
+ * the behavior of the BucketWriter class of close()ing upon IOException.
*/
@Test
public void testCloseReopen() throws InterruptedException,
LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
+ final int numBatches = 4;
final long txnMax = 25;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
- final int numBatches = 4;
String newPath = testPath + "/singleBucket";
int i = 1, j = 1;
@@ -724,29 +725,8 @@ public class TestHDFSEventSink {
Configurables.configure(sink, context);
- Channel channel = mock(Channel.class);
- final List<Event> events = Lists.newArrayList();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- events.add((Event)args[0]);
- return null;
- }
- }).when(channel).put(any(Event.class));
-
- when(channel.take()).then(new Answer<Event>() {
- @Override
- public Event answer(InvocationOnMock invocation) throws Throwable {
- if(events.isEmpty()) {
- return null;
- }
- return events.remove(0);
- }
- });
- when(channel.getTransaction()).thenReturn(mock(Transaction.class));
-
- Configurables.configure(channel, context);
+ MemoryChannel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
sink.setChannel(channel);
sink.start();
@@ -755,19 +735,25 @@ public class TestHDFSEventSink {
List<String> bodies = Lists.newArrayList();
// push the event batches into channel
for (i = 1; i < numBatches; i++) {
- for (j = 1; j <= txnMax; j++) {
- Event event = new SimpleEvent();
- eventDate.clear();
- eventDate.set(2011, i, i, i, 0); // yy mm dd
- event.getHeaders().put("timestamp",
- String.valueOf(eventDate.getTimeInMillis()));
- event.getHeaders().put("hostname", "Host" + i);
- String body = "Test." + i + "." + j;
- event.setBody(body.getBytes());
- bodies.add(body);
- // inject fault
- event.getHeaders().put("fault-until-reopen", "");
- channel.put(event);
+ channel.getTransaction().begin();
+ try {
+ for (j = 1; j <= txnMax; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+ String body = "Test." + i + "." + j;
+ event.setBody(body.getBytes());
+ bodies.add(body);
+ // inject fault
+ event.getHeaders().put("fault-until-reopen", "");
+ channel.put(event);
+ }
+ channel.getTransaction().commit();
+ } finally {
+ channel.getTransaction().close();
}
LOG.info("execute sink to process the events: " + sink.process());
}
@@ -812,6 +798,7 @@ public class TestHDFSEventSink {
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+ context.put("hdfs.callTimeout", Long.toString(1000));
Configurables.configure(sink, context);
Channel channel = new MemoryChannel();