You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2011/12/08 00:10:23 UTC

svn commit: r1211699 - in /incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src: main/java/org/apache/flume/sink/hdfs/ test/java/org/apache/flume/sink/hdfs/

Author: prasadm
Date: Wed Dec  7 23:10:22 2011
New Revision: 1211699

URL: http://svn.apache.org/viewvc?rev=1211699&view=rev
Log:
Flume-871. Background HDFS append with configurable wait timeout

Modified:
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Dec  7 23:10:22 2011
@@ -25,6 +25,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -59,6 +67,7 @@ public class HDFSEventSink extends Abstr
   static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   static final int defaultMaxOpenFiles = 5000;
   static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
+  static final long defaultAppendTimeout = 1000;
 
   private long rollInterval;
   private long rollSize;
@@ -71,7 +80,9 @@ public class HDFSEventSink extends Abstr
   private String path;
   private int maxOpenFiles;
   private String writeFormat;
-  private HDFSWriterFactory myWriterFactory; 
+  private HDFSWriterFactory myWriterFactory;
+  private ExecutorService executor;
+  private long appendTimeout;
 
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
@@ -128,6 +139,7 @@ public class HDFSEventSink extends Abstr
     String fileType = context.get("hdfs.fileType", String.class);
     String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
     String writeFormat = context.get("hdfs.writeFormat", String.class);
+    String appendTimeout = context.get("hdfs.appendTimeout", String.class);
 
     if (fileName == null)
       fileName = defaultFileName;
@@ -190,6 +202,12 @@ public class HDFSEventSink extends Abstr
     } else {
       this.writeFormat = writeFormat;
     }
+
+    if (appendTimeout == null) {
+      this.appendTimeout = defaultAppendTimeout;
+    } else {
+      this.appendTimeout = Long.parseLong(appendTimeout);
+    }
   }
 
   private static boolean codecMatches(Class<? extends CompressionCodec> cls,
@@ -246,6 +264,50 @@ public class HDFSEventSink extends Abstr
     return codec;
   }
 
+  /* 
+   * Execute the append on a separate thread and wait for the completion for the specified amount of time
+   * In case of timeout, cancel the append and throw an IOException
+   */
+  private BucketFlushStatus backgroundAppend(final BucketWriter bw, final Event e) throws IOException, InterruptedException {
+    Future<BucketFlushStatus> future = executor.submit(new Callable<BucketFlushStatus>() {
+      public BucketFlushStatus call() throws Exception {
+        return bw.append(e);
+      }
+    });
+
+    try {
+      if (appendTimeout > 0) {
+        return future.get(appendTimeout, TimeUnit.MILLISECONDS);
+      } else {
+        return future.get();
+      }
+    } catch (TimeoutException eT) {
+      future.cancel(true);
+      throw new IOException("Append timed out", eT);
+    } catch (ExecutionException e1) {
+      Throwable cause = e1.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
+      } else if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
+      } else {
+        // we have a throwable that is not an exception. (such as a
+        // NoClassDefFoundError)
+        LOG.error("Got a throwable that is not an exception! Bailing out!",
+            e1.getCause());
+        throw new RuntimeException(e1.getCause());
+      }
+    } catch (CancellationException ce) {
+      throw new InterruptedException(
+          "Blocked append interrupted by rotation event");
+    } catch (InterruptedException ex) {
+      LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
+      throw (InterruptedException) ex;
+    }
+  }
+
   /**
    * Pull events out of channel and send it to HDFS - take at the most
    * txnEventMax, that's the maximum #events to hold in channel for a given
@@ -281,7 +343,7 @@ public class HDFSEventSink extends Abstr
         }
 
         // Write the data to HDFS
-        syncedUp = bw.append(event);
+        syncedUp = backgroundAppend(bw, event);
 
         // keep track of the files in current batch that are not flushed
         // we need to flush all those at the end of the transaction
@@ -326,11 +388,22 @@ public class HDFSEventSink extends Abstr
     } catch (IOException eIO) {
       LOG.warn("IOException in opening file", eIO);
     }
+    executor.shutdown();
+    try {
+      while (executor.isTerminated() == false) {
+        executor.awaitTermination(defaultAppendTimeout, TimeUnit.MILLISECONDS);
+      }
+    } catch (InterruptedException ex) {
+      LOG.warn("shutdown interrupted" + ex.getMessage(), ex);
+    }
+
+    executor = null;
     super.stop();
   }
 
   @Override
   public void start() {
+    executor = Executors.newFixedThreadPool(1);
     for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
       try {
         e.getValue().open();

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java Wed Dec  7 23:10:22 2011
@@ -33,6 +33,13 @@ public class HDFSBadDataStream extends H
 
       if (e.getHeaders().containsKey("fault")) {
         throw new IOException("Injected fault");
+      } else if (e.getHeaders().containsKey("slow")) {
+        long waitTime = Long.parseLong(e.getHeaders().get("slow"));
+        try {
+          Thread.sleep(waitTime);
+        } catch (InterruptedException eT) {
+          throw new IOException("append interrupted", eT);
+        }
       }
       super.append(e, fmt);
     }

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java Wed Dec  7 23:10:22 2011
@@ -34,7 +34,15 @@ public class HDFSBadSeqWriter extends HD
     } else if (e.getHeaders().containsKey("fault-once")) {
       e.getHeaders().remove("fault-once");
       throw new IOException("Injected fault");
+    } else if (e.getHeaders().containsKey("slow")) {
+      long waitTime = Long.parseLong(e.getHeaders().get("slow"));
+      try {
+        Thread.sleep(waitTime);
+      } catch (InterruptedException eT) {
+        throw new IOException("append interrupted", eT);
+      }
     }
+
     super.append(e, fmt);
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1211699&r1=1211698&r2=1211699&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Wed Dec  7 23:10:22 2011
@@ -21,16 +21,20 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink.Status;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -237,6 +241,7 @@ public class TestHDFSEventSink {
     Configurables.configure(channel, context);
 
     sink.setChannel(channel);
+    sink.start();
 
     Calendar eventDate = Calendar.getInstance();
 
@@ -414,6 +419,7 @@ public class TestHDFSEventSink {
     Configurables.configure(channel, context);
 
     sink.setChannel(channel);
+    sink.start();
 
     Calendar eventDate = Calendar.getInstance();
 
@@ -450,4 +456,199 @@ public class TestHDFSEventSink {
     sink.stop();
   }
 
+  /* 
+   * append using slow sink writer. 
+   * verify that the process returns backoff due to timeout
+   */
+  @Test
+  public void testSlowAppendFailure() throws InterruptedException,
+      LifecycleException, EventDeliveryException, IOException {
+
+    final long txnMax = 2;
+    final String fileName = "FlumeData";
+    final long rollCount = 5;
+    final long batchSize = 2;
+    final int numBatches = 2;
+    String newPath = testPath + "/singleBucket";
+    int i = 1, j = 1;
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    // create HDFS sink with slow writer
+    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    sink = new HDFSEventSink(badWriterFactory);
+
+    Context context = new Context();
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (i = 1; i < numBatches; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+        event.getHeaders().put("slow", "1500");
+        event.setBody(("Test." + i + "." + j).getBytes());
+        channel.put(event);
+      }
+      txn.commit();
+      txn.close();
+
+      // execute sink to process the events
+      Status satus = sink.process();
+        
+      // verify that the append returned backoff due to timeotu
+      Assert.assertEquals(satus, Status.BACKOFF);
+    }
+
+    sink.stop();
+  }
+
+  /* 
+   * append using slow sink writer with specified append timeout
+   * verify that the data is written correctly to files
+   */  
+  private void slowAppendTestHelper (long appendTimeout)  throws InterruptedException,
+  LifecycleException, EventDeliveryException, IOException {
+    final long txnMax = 2;
+    final String fileName = "FlumeData";
+    final long rollCount = 5;
+    final long batchSize = 2;
+    final int numBatches = 2;
+    String newPath = testPath + "/singleBucket";
+    int totalEvents = 0;
+    int i = 1, j = 1;
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    // create HDFS sink with slow writer
+    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    sink = new HDFSEventSink(badWriterFactory);
+
+    Context context = new Context();
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    context.put("hdfs.appendTimeout", String.valueOf(appendTimeout));
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (i = 1; i < numBatches; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+        event.getHeaders().put("slow", "1500");
+        event.setBody(("Test." + i + "." + j).getBytes());
+        channel.put(event);
+        totalEvents++;
+      }
+      txn.commit();
+      txn.close();
+
+      // execute sink to process the events
+      sink.process();
+    }
+
+    sink.stop();
+
+    // loop through all the files generated and check their contains
+    FileStatus[] dirStat = fs.listStatus(dirPath);
+    Path fList[] = FileUtil.stat2Paths(dirStat);
+
+    // check that the roll happened correctly for the given data
+    // Note that we'll end up with one last file with only header
+    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
+
+    try {
+      i = j = 1;
+      for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+        Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+        LongWritable key = new LongWritable();
+        BytesWritable value = new BytesWritable();
+        BytesWritable expValue;
+
+        while (reader.next(key, value)) {
+          expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
+          Assert.assertEquals(expValue, value);
+          if (++j > txnMax) {
+            j = 1;
+            i++;
+          }
+        }
+        reader.close();
+      }
+    } catch (IOException ioe) {
+      System.err.println("IOException during operation: " + ioe.toString());
+      System.exit(1);
+    }
+    Assert.assertEquals(1, i);
+  }
+
+  /* 
+   * append using slow sink writer with long append timeout
+   * verify that the data is written correctly to files
+   */  
+  @Test
+  public void testSlowAppendWithLongTimeout() throws InterruptedException,
+      LifecycleException, EventDeliveryException, IOException {
+    slowAppendTestHelper(3000);    
+  }
+
+  /* 
+   * append using slow sink writer with no timeout to make append
+   * synchronous. Verify that the data is written correctly to files
+   */  
+  @Test
+  public void testSlowAppendWithoutTimeout() throws InterruptedException,
+      LifecycleException, EventDeliveryException, IOException {
+    slowAppendTestHelper(0);
+  }
+
 }