You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/20 02:49:57 UTC
flume git commit: FLUME-2922. Sync SequenceFile.Writer before calling
hflush
Repository: flume
Updated Branches:
refs/heads/trunk 9965dae7b -> 358bb6700
FLUME-2922. Sync SequenceFile.Writer before calling hflush
This closes #52
(Kevin Conaway via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/358bb670
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/358bb670
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/358bb670
Branch: refs/heads/trunk
Commit: 358bb670029549ed4cff192c79307fd3e4d69972
Parents: 9965dae
Author: Kevin Conaway <ke...@walmart.com>
Authored: Thu Jun 9 15:50:13 2016 -0400
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 19:44:03 2016 -0700
----------------------------------------------------------------------
.../flume/sink/hdfs/HDFSSequenceFile.java | 1 +
.../flume/sink/hdfs/TestHDFSEventSink.java | 95 ++++++++++++++++++++
2 files changed, 96 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/358bb670/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index ba8b30d..c5430ba 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -109,6 +109,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
@Override
public void sync() throws IOException {
+ writer.sync();
hflushOrSync(outStream);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/358bb670/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 73f016b..782cf47 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.CharsetDecoder;
+import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -1325,6 +1327,99 @@ public class TestHDFSEventSink {
fs.close();
}
+ /**
+ * This test simulates what happens when a batch of events is written to a compressed sequence
+ * file (and thus hsync'd to hdfs) but the file is not yet closed.
+ *
+ * When this happens, the data that we wrote should still be readable.
+ */
+ @Test
+ public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException {
+ String hdfsPath = testPath + "/sequenceFileWriterSync";
+ FileSystem fs = FileSystem.get(new Configuration());
+ // Since we are reading a partial file we don't want to use checksums
+ fs.setVerifyChecksum(false);
+ fs.setWriteChecksum(false);
+
+ // Compression codecs that don't require native hadoop libraries
+ String [] codecs = {"BZip2Codec", "DeflateCodec"};
+
+ for (String codec : codecs) {
+ sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Collections.singletonList(
+ "single-event"
+ ));
+
+ sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Arrays.asList(
+ "multiple-events-1",
+ "multiple-events-2",
+ "multiple-events-3",
+ "multiple-events-4",
+ "multiple-events-5"
+ ));
+ }
+
+ fs.close();
+ }
+
+ private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, String codec,
+ Collection<String> eventBodies)
+ throws IOException, EventDeliveryException {
+ Path dirPath = new Path(hdfsPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ Context context = new Context();
+ context.put("hdfs.path", hdfsPath);
+ // Ensure the file isn't closed and rolled
+ context.put("hdfs.rollCount", String.valueOf(eventBodies.size() + 1));
+ context.put("hdfs.rollSize", "0");
+ context.put("hdfs.rollInterval", "0");
+ context.put("hdfs.batchSize", "1");
+ context.put("hdfs.fileType", "SequenceFile");
+ context.put("hdfs.codeC", codec);
+ context.put("hdfs.writeFormat", "Writable");
+ Configurables.configure(sink, context);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+ sink.start();
+
+ for (String eventBody : eventBodies) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+
+ Event event = new SimpleEvent();
+ event.setBody(eventBody.getBytes());
+ channel.put(event);
+
+ txn.commit();
+ txn.close();
+
+ sink.process();
+ }
+
+ // Sink is _not_ closed. The file should remain open but
+ // the data written should be visible to readers via sync + hflush
+ FileStatus[] dirStat = fs.listStatus(dirPath);
+ Path[] paths = FileUtil.stat2Paths(dirStat);
+
+ Assert.assertEquals(1, paths.length);
+
+ SequenceFile.Reader reader =
+ new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.stream(fs.open(paths[0])));
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+
+ for (String eventBody : eventBodies) {
+ Assert.assertTrue(reader.next(key, value));
+ Assert.assertArrayEquals(eventBody.getBytes(), value.copyBytes());
+ }
+
+ Assert.assertFalse(reader.next(key, value));
+ }
+
private Context getContextForRetryTests() {
Context context = new Context();