You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2011/12/08 00:10:23 UTC
svn commit: r1211699 - in
/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src:
main/java/org/apache/flume/sink/hdfs/ test/java/org/apache/flume/sink/hdfs/
Author: prasadm
Date: Wed Dec 7 23:10:22 2011
New Revision: 1211699
URL: http://svn.apache.org/viewvc?rev=1211699&view=rev
Log:
Flume-871. Background HDFS append with configurable wait timeout
Modified:
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Dec 7 23:10:22 2011
@@ -25,6 +25,14 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -59,6 +67,7 @@ public class HDFSEventSink extends Abstr
static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
static final int defaultMaxOpenFiles = 5000;
static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
+ static final long defaultAppendTimeout = 1000;
private long rollInterval;
private long rollSize;
@@ -71,7 +80,9 @@ public class HDFSEventSink extends Abstr
private String path;
private int maxOpenFiles;
private String writeFormat;
- private HDFSWriterFactory myWriterFactory;
+ private HDFSWriterFactory myWriterFactory;
+ private ExecutorService executor;
+ private long appendTimeout;
/*
* Extended Java LinkedHashMap for open file handle LRU queue We want to clear
@@ -128,6 +139,7 @@ public class HDFSEventSink extends Abstr
String fileType = context.get("hdfs.fileType", String.class);
String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
String writeFormat = context.get("hdfs.writeFormat", String.class);
+ String appendTimeout = context.get("hdfs.appendTimeout", String.class);
if (fileName == null)
fileName = defaultFileName;
@@ -190,6 +202,12 @@ public class HDFSEventSink extends Abstr
} else {
this.writeFormat = writeFormat;
}
+
+ if (appendTimeout == null) {
+ this.appendTimeout = defaultAppendTimeout;
+ } else {
+ this.appendTimeout = Long.parseLong(appendTimeout);
+ }
}
private static boolean codecMatches(Class<? extends CompressionCodec> cls,
@@ -246,6 +264,50 @@ public class HDFSEventSink extends Abstr
return codec;
}
+ /*
+ * Execute the append on a separate thread and wait for the completion for the specified amount of time
+ * In case of timeout, cancel the append and throw an IOException
+ */
+ private BucketFlushStatus backgroundAppend(final BucketWriter bw, final Event e) throws IOException, InterruptedException {
+ Future<BucketFlushStatus> future = executor.submit(new Callable<BucketFlushStatus>() {
+ public BucketFlushStatus call() throws Exception {
+ return bw.append(e);
+ }
+ });
+
+ try {
+ if (appendTimeout > 0) {
+ return future.get(appendTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ return future.get();
+ }
+ } catch (TimeoutException eT) {
+ future.cancel(true);
+ throw new IOException("Append timed out", eT);
+ } catch (ExecutionException e1) {
+ Throwable cause = e1.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else if (cause instanceof InterruptedException) {
+ throw (InterruptedException) cause;
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ // we have a throwable that is not an exception. (such as a
+ // NoClassDefFoundError)
+ LOG.error("Got a throwable that is not an exception! Bailing out!",
+ e1.getCause());
+ throw new RuntimeException(e1.getCause());
+ }
+ } catch (CancellationException ce) {
+ throw new InterruptedException(
+ "Blocked append interrupted by rotation event");
+ } catch (InterruptedException ex) {
+ LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
+ throw (InterruptedException) ex;
+ }
+ }
+
/**
* Pull events out of channel and send it to HDFS - take at the most
* txnEventMax, that's the maximum #events to hold in channel for a given
@@ -281,7 +343,7 @@ public class HDFSEventSink extends Abstr
}
// Write the data to HDFS
- syncedUp = bw.append(event);
+ syncedUp = backgroundAppend(bw, event);
// keep track of the files in current batch that are not flushed
// we need to flush all those at the end of the transaction
@@ -326,11 +388,22 @@ public class HDFSEventSink extends Abstr
} catch (IOException eIO) {
LOG.warn("IOException in opening file", eIO);
}
+ executor.shutdown();
+ try {
+ while (executor.isTerminated() == false) {
+ executor.awaitTermination(defaultAppendTimeout, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException ex) {
+ LOG.warn("shutdown interrupted" + ex.getMessage(), ex);
+ }
+
+ executor = null;
super.stop();
}
@Override
public void start() {
+ executor = Executors.newFixedThreadPool(1);
for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
try {
e.getValue().open();
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java Wed Dec 7 23:10:22 2011
@@ -33,6 +33,13 @@ public class HDFSBadDataStream extends H
if (e.getHeaders().containsKey("fault")) {
throw new IOException("Injected fault");
+ } else if (e.getHeaders().containsKey("slow")) {
+ long waitTime = Long.parseLong(e.getHeaders().get("slow"));
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException eT) {
+ throw new IOException("append interrupted", eT);
+ }
}
super.append(e, fmt);
}
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java Wed Dec 7 23:10:22 2011
@@ -34,7 +34,15 @@ public class HDFSBadSeqWriter extends HD
} else if (e.getHeaders().containsKey("fault-once")) {
e.getHeaders().remove("fault-once");
throw new IOException("Injected fault");
+ } else if (e.getHeaders().containsKey("slow")) {
+ long waitTime = Long.parseLong(e.getHeaders().get("slow"));
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException eT) {
+ throw new IOException("append interrupted", eT);
+ }
}
+
super.append(e, fmt);
}
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Wed Dec 7 23:10:22 2011
@@ -21,16 +21,20 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink.Status;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.sink.FlumeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -237,6 +241,7 @@ public class TestHDFSEventSink {
Configurables.configure(channel, context);
sink.setChannel(channel);
+ sink.start();
Calendar eventDate = Calendar.getInstance();
@@ -414,6 +419,7 @@ public class TestHDFSEventSink {
Configurables.configure(channel, context);
sink.setChannel(channel);
+ sink.start();
Calendar eventDate = Calendar.getInstance();
@@ -450,4 +456,199 @@ public class TestHDFSEventSink {
sink.stop();
}
+ /*
+ * append using slow sink writer.
+ * verify that the process returns backoff due to timeout
+ */
+ @Test
+ public void testSlowAppendFailure() throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
+
+ final long txnMax = 2;
+ final String fileName = "FlumeData";
+ final long rollCount = 5;
+ final long batchSize = 2;
+ final int numBatches = 2;
+ String newPath = testPath + "/singleBucket";
+ int i = 1, j = 1;
+
+ // clear the test directory
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(newPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ // create HDFS sink with slow writer
+ HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+ sink = new HDFSEventSink(badWriterFactory);
+
+ Context context = new Context();
+ context.put("hdfs.path", newPath);
+ context.put("hdfs.filePrefix", fileName);
+ context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+ context.put("hdfs.rollCount", String.valueOf(rollCount));
+ context.put("hdfs.batchSize", String.valueOf(batchSize));
+ context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+ Configurables.configure(sink, context);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+ sink.start();
+
+ Calendar eventDate = Calendar.getInstance();
+
+ // push the event batches into channel
+ for (i = 1; i < numBatches; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (j = 1; j <= txnMax; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+ event.getHeaders().put("slow", "1500");
+ event.setBody(("Test." + i + "." + j).getBytes());
+ channel.put(event);
+ }
+ txn.commit();
+ txn.close();
+
+ // execute sink to process the events
+ Status satus = sink.process();
+
+ // verify that the append returned backoff due to timeotu
+ Assert.assertEquals(satus, Status.BACKOFF);
+ }
+
+ sink.stop();
+ }
+
+ /*
+ * append using slow sink writer with specified append timeout
+ * verify that the data is written correctly to files
+ */
+ private void slowAppendTestHelper (long appendTimeout) throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
+ final long txnMax = 2;
+ final String fileName = "FlumeData";
+ final long rollCount = 5;
+ final long batchSize = 2;
+ final int numBatches = 2;
+ String newPath = testPath + "/singleBucket";
+ int totalEvents = 0;
+ int i = 1, j = 1;
+
+ // clear the test directory
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(newPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ // create HDFS sink with slow writer
+ HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+ sink = new HDFSEventSink(badWriterFactory);
+
+ Context context = new Context();
+ context.put("hdfs.path", newPath);
+ context.put("hdfs.filePrefix", fileName);
+ context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+ context.put("hdfs.rollCount", String.valueOf(rollCount));
+ context.put("hdfs.batchSize", String.valueOf(batchSize));
+ context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+ context.put("hdfs.appendTimeout", String.valueOf(appendTimeout));
+ Configurables.configure(sink, context);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+ sink.start();
+
+ Calendar eventDate = Calendar.getInstance();
+
+ // push the event batches into channel
+ for (i = 1; i < numBatches; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (j = 1; j <= txnMax; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+ event.getHeaders().put("slow", "1500");
+ event.setBody(("Test." + i + "." + j).getBytes());
+ channel.put(event);
+ totalEvents++;
+ }
+ txn.commit();
+ txn.close();
+
+ // execute sink to process the events
+ sink.process();
+ }
+
+ sink.stop();
+
+ // loop through all the files generated and check their contains
+ FileStatus[] dirStat = fs.listStatus(dirPath);
+ Path fList[] = FileUtil.stat2Paths(dirStat);
+
+ // check that the roll happened correctly for the given data
+ // Note that we'll end up with one last file with only header
+ Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+
+ try {
+ i = j = 1;
+ for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+ Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+ BytesWritable expValue;
+
+ while (reader.next(key, value)) {
+ expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
+ Assert.assertEquals(expValue, value);
+ if (++j > txnMax) {
+ j = 1;
+ i++;
+ }
+ }
+ reader.close();
+ }
+ } catch (IOException ioe) {
+ System.err.println("IOException during operation: " + ioe.toString());
+ System.exit(1);
+ }
+ Assert.assertEquals(1, i);
+ }
+
+ /*
+ * append using slow sink writer with long append timeout
+ * verify that the data is written correctly to files
+ */
+ @Test
+ public void testSlowAppendWithLongTimeout() throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
+ slowAppendTestHelper(3000);
+ }
+
+ /*
+ * append using slow sink writer with no timeout to make append
+ * synchronous. Verify that the data is written correctly to files
+ */
+ @Test
+ public void testSlowAppendWithoutTimeout() throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
+ slowAppendTestHelper(0);
+ }
+
}