You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/03/20 00:33:11 UTC
git commit: FLUME-1955. fileSuffix does not work with compressed
streams.
Updated Branches:
refs/heads/trunk 4c5220bb7 -> 2f6fea509
FLUME-1955. fileSuffix does not work with compressed streams.
(Mike Percy via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2f6fea50
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2f6fea50
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2f6fea50
Branch: refs/heads/trunk
Commit: 2f6fea50965e5a8ae059d179b0b25be738696cef
Parents: 4c5220b
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Mar 19 16:32:08 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Mar 19 16:32:08 2013 -0700
----------------------------------------------------------------------
.../org/apache/flume/sink/hdfs/BucketWriter.java | 6 +--
.../org/apache/flume/sink/hdfs/HDFSEventSink.java | 3 +-
.../apache/flume/sink/hdfs/TestBucketWriter.java | 31 +++++++++++++++
3 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 774f297..0897c97 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -222,11 +222,9 @@ class BucketWriter {
String fullFileName = fileName + "." + counter;
- if (codeC == null && fileSuffix != null && fileSuffix.length() > 0) {
+ if (fileSuffix != null && fileSuffix.length() > 0) {
fullFileName += fileSuffix;
- }
-
- if(codeC != null) {
+ } else if (codeC != null) {
fullFileName += codeC.getDefaultExtension();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 741ac90..f0a6e4b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -294,7 +294,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
return false;
}
- private static CompressionCodec getCodec(String codecName) {
+ @VisibleForTesting
+ static CompressionCodec getCodec(String codecName) {
Configuration conf = new Configuration();
List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
.getCodecClasses(conf);
http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 99e787e..f741e03 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -280,6 +280,37 @@ public class TestBucketWriter {
}
@Test
+ public void testFileSuffixCompressed()
+ throws IOException, InterruptedException {
+ final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
+ final String suffix = ".foo";
+
+ MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+ BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+ "/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"),
+ SequenceFile.CompressionType.BLOCK, hdfsWriter,
+ timedRollerPool, null,
+ new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+ 0, null, null, 30000, Executors.newSingleThreadExecutor());
+
+ // Need to override system time use for test so we know what to expect
+ final long testTime = System.currentTimeMillis();
+
+ Clock testClock = new Clock() {
+ public long currentTimeMillis() {
+ return testTime;
+ }
+ };
+ bucketWriter.setClock(testClock);
+
+ Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+ bucketWriter.append(e);
+
+ Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath()
+ .endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
+ }
+
+ @Test
public void testInUsePrefix() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";