You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/03/20 00:33:11 UTC

git commit: FLUME-1955. fileSuffix does not work with compressed streams.

Updated Branches:
  refs/heads/trunk 4c5220bb7 -> 2f6fea509


FLUME-1955. fileSuffix does not work with compressed streams.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2f6fea50
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2f6fea50
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2f6fea50

Branch: refs/heads/trunk
Commit: 2f6fea50965e5a8ae059d179b0b25be738696cef
Parents: 4c5220b
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Mar 19 16:32:08 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Mar 19 16:32:08 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |    6 +--
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |    3 +-
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   31 +++++++++++++++
 3 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 774f297..0897c97 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -222,11 +222,9 @@ class BucketWriter {
 
         String fullFileName = fileName + "." + counter;
 
-        if (codeC == null && fileSuffix != null && fileSuffix.length() > 0) {
+        if (fileSuffix != null && fileSuffix.length() > 0) {
           fullFileName += fileSuffix;
-        }
-
-        if(codeC != null) {
+        } else if (codeC != null) {
           fullFileName += codeC.getDefaultExtension();
         }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 741ac90..f0a6e4b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -294,7 +294,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     return false;
   }
 
-  private static CompressionCodec getCodec(String codecName) {
+  @VisibleForTesting
+  static CompressionCodec getCodec(String codecName) {
     Configuration conf = new Configuration();
     List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
         .getCodecClasses(conf);

http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 99e787e..f741e03 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -280,6 +280,37 @@ public class TestBucketWriter {
     }
 
   @Test
+  public void testFileSuffixCompressed()
+      throws IOException, InterruptedException {
+    final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
+    final String suffix = ".foo";
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+        "/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"),
+        SequenceFile.CompressionType.BLOCK, hdfsWriter,
+        timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null, null, 30000, Executors.newSingleThreadExecutor());
+
+    // Need to override system time use for test so we know what to expect
+    final long testTime = System.currentTimeMillis();
+
+    Clock testClock = new Clock() {
+      public long currentTimeMillis() {
+        return testTime;
+      }
+    };
+    bucketWriter.setClock(testClock);
+
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    bucketWriter.append(e);
+
+    Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath()
+        .endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
+  }
+
+  @Test
   public void testInUsePrefix() throws IOException, InterruptedException {
     final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
     final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";