You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/20 02:49:57 UTC

flume git commit: FLUME-2922. Sync SequenceFile.Writer before calling hflush

Repository: flume
Updated Branches:
  refs/heads/trunk 9965dae7b -> 358bb6700


FLUME-2922. Sync SequenceFile.Writer before calling hflush

This closes #52

(Kevin Conaway via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/358bb670
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/358bb670
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/358bb670

Branch: refs/heads/trunk
Commit: 358bb670029549ed4cff192c79307fd3e4d69972
Parents: 9965dae
Author: Kevin Conaway <ke...@walmart.com>
Authored: Thu Jun 9 15:50:13 2016 -0400
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 19:44:03 2016 -0700

----------------------------------------------------------------------
 .../flume/sink/hdfs/HDFSSequenceFile.java       |  1 +
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 95 ++++++++++++++++++++
 2 files changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/358bb670/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index ba8b30d..c5430ba 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -109,6 +109,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
 
   @Override
   public void sync() throws IOException {
+    writer.sync();
     hflushOrSync(outStream);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/358bb670/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 73f016b..782cf47 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharsetDecoder;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -1325,6 +1327,99 @@ public class TestHDFSEventSink {
     fs.close();
   }
 
+  /**
+   * This test simulates what happens when a batch of events is written to a compressed sequence
+   * file (and thus hsync'd to hdfs) but the file is not yet closed.
+   *
+   * When this happens, the data that we wrote should still be readable.
+   */
+  @Test
+  public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException {
+    String hdfsPath = testPath + "/sequenceFileWriterSync";
+    FileSystem fs = FileSystem.get(new Configuration());
+    // Since we are reading a partial file we don't want to use checksums
+    fs.setVerifyChecksum(false);
+    fs.setWriteChecksum(false);
+
+    // Compression codecs that don't require native hadoop libraries
+    String [] codecs = {"BZip2Codec", "DeflateCodec"};
+
+    for (String codec : codecs) {
+      sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Collections.singletonList(
+          "single-event"
+      ));
+
+      sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Arrays.asList(
+          "multiple-events-1",
+          "multiple-events-2",
+          "multiple-events-3",
+          "multiple-events-4",
+          "multiple-events-5"
+      ));
+    }
+
+    fs.close();
+  }
+
+  private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, String codec,
+                                                Collection<String> eventBodies)
+      throws IOException, EventDeliveryException {
+    Path dirPath = new Path(hdfsPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+    context.put("hdfs.path", hdfsPath);
+    // Ensure the file isn't closed and rolled
+    context.put("hdfs.rollCount", String.valueOf(eventBodies.size() + 1));
+    context.put("hdfs.rollSize", "0");
+    context.put("hdfs.rollInterval", "0");
+    context.put("hdfs.batchSize", "1");
+    context.put("hdfs.fileType", "SequenceFile");
+    context.put("hdfs.codeC", codec);
+    context.put("hdfs.writeFormat", "Writable");
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    for (String eventBody : eventBodies) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+
+      Event event = new SimpleEvent();
+      event.setBody(eventBody.getBytes());
+      channel.put(event);
+
+      txn.commit();
+      txn.close();
+
+      sink.process();
+    }
+
+    // Sink is _not_ closed.  The file should remain open but
+    // the data written should be visible to readers via sync + hflush
+    FileStatus[] dirStat = fs.listStatus(dirPath);
+    Path[] paths = FileUtil.stat2Paths(dirStat);
+
+    Assert.assertEquals(1, paths.length);
+
+    SequenceFile.Reader reader =
+        new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.stream(fs.open(paths[0])));
+    LongWritable key = new LongWritable();
+    BytesWritable value = new BytesWritable();
+
+    for (String eventBody : eventBodies) {
+      Assert.assertTrue(reader.next(key, value));
+      Assert.assertArrayEquals(eventBody.getBytes(), value.copyBytes());
+    }
+
+    Assert.assertFalse(reader.next(key, value));
+  }
+
   private Context getContextForRetryTests() {
     Context context = new Context();