You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/01/16 02:42:56 UTC

git commit: FLUME-1844. HDFSEventSink should have option to use RawLocalFileSystem.

Updated Branches:
  refs/heads/trunk 750809c70 -> 118752374


FLUME-1844. HDFSEventSink should have option to use RawLocalFileSystem.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/11875237
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/11875237
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/11875237

Branch: refs/heads/trunk
Commit: 11875237420eef3e47ad84e7998e2caff6fc2de6
Parents: 750809c
Author: Hari Shreedharan <ha...@gmail.com>
Authored: Tue Jan 15 17:31:51 2013 -0800
Committer: Hari Shreedharan <ha...@gmail.com>
Committed: Tue Jan 15 17:42:41 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |    3 +-
 .../flume/sink/hdfs/HDFSCompressedDataStream.java  |   15 ++
 .../org/apache/flume/sink/hdfs/HDFSDataStream.java |   21 +++-
 .../apache/flume/sink/hdfs/HDFSSequenceFile.java   |   19 +++-
 .../apache/flume/sink/hdfs/TestHDFSEventSink.java  |   17 ++-
 .../flume/sink/hdfs/TestUseRawLocalFileSystem.java |  105 +++++++++++++++
 6 files changed, 174 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index f14f7cb..3f31ef2 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -34,6 +34,7 @@ import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -128,7 +129,7 @@ class BucketWriter {
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
     isOpen = false;
-    writer.configure(context);
+    this.writer.configure(context);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 18fe6d4..95eb252 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -26,6 +26,7 @@ import org.apache.flume.serialization.EventSerializerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -46,12 +47,17 @@ public class HDFSCompressedDataStream implements HDFSWriter {
   private String serializerType;
   private Context serializerContext;
   private EventSerializer serializer;
+  private boolean useRawLocalFileSystem;
 
   @Override
   public void configure(Context context) {
     serializerType = context.getString("serializer", "TEXT");
+    useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
+        false);
     serializerContext = new Context(
         context.getSubProperties(EventSerializer.CTX_PREFIX));
+    logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+        + useRawLocalFileSystem);
   }
 
   @Override
@@ -67,6 +73,14 @@ public class HDFSCompressedDataStream implements HDFSWriter {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
+    if(useRawLocalFileSystem) {
+      if(hdfs instanceof LocalFileSystem) {
+        hdfs = ((LocalFileSystem)hdfs).getRaw();
+      } else {
+        logger.warn("useRawLocalFileSystem is set to true but file system " +
+            "is not of type LocalFileSystem: " + hdfs.getClass().getName());
+      }
+    }
 
     boolean appending = false;
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
@@ -76,6 +90,7 @@ public class HDFSCompressedDataStream implements HDFSWriter {
     } else {
       fsOut = hdfs.create(dstPath);
     }
+    System.out.println("fsOut " + fsOut);
     cmpOut = codec.createOutputStream(fsOut);
     serializer = EventSerializerFactory.getInstance(serializerType,
         serializerContext, cmpOut);

http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index bd40a88..04120ec 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -26,22 +26,33 @@ import org.apache.flume.serialization.EventSerializerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HDFSDataStream implements HDFSWriter {
-  private FSDataOutputStream outStream;
 
+  private static final Logger logger =
+      LoggerFactory.getLogger(HDFSSequenceFile.class);
+
+  private FSDataOutputStream outStream;
   private String serializerType;
   private Context serializerContext;
   private EventSerializer serializer;
+  private boolean useRawLocalFileSystem;
 
   @Override
   public void configure(Context context) {
     serializerType = context.getString("serializer", "TEXT");
+    useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
+        false);
     serializerContext =
         new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
+    logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+        + useRawLocalFileSystem);
   }
 
   @Override
@@ -49,6 +60,14 @@ public class HDFSDataStream implements HDFSWriter {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
+    if(useRawLocalFileSystem) {
+      if(hdfs instanceof LocalFileSystem) {
+        hdfs = ((LocalFileSystem)hdfs).getRaw();
+      } else {
+        logger.warn("useRawLocalFileSystem is set to true but file system " +
+            "is not of type LocalFileSystem: " + hdfs.getClass().getName());
+      }
+    }
 
     boolean appending = false;
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile

http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 1e6d68f..e127f6a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -24,17 +24,23 @@ import org.apache.flume.Event;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HDFSSequenceFile implements HDFSWriter {
 
+  private static final Logger logger =
+      LoggerFactory.getLogger(HDFSSequenceFile.class);
   private SequenceFile.Writer writer;
   private String writeFormat;
   private Context serializerContext;
   private SeqFileFormatter formatter;
+  private boolean useRawLocalFileSystem;
 
   public HDFSSequenceFile() {
     writer = null;
@@ -44,10 +50,14 @@ public class HDFSSequenceFile implements HDFSWriter {
   public void configure(Context context) {
     // use binary writable format by default
     writeFormat = context.getString("hdfs.writeFormat", SeqFileFormatterType.Writable.name());
+    useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
+        false);
     serializerContext = new Context(
             context.getSubProperties(SeqFileFormatterFactory.CTX_PREFIX));
     formatter = SeqFileFormatterFactory
             .getFormatter(writeFormat, serializerContext);
+    logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = "
+        + useRawLocalFileSystem);
   }
 
   @Override
@@ -61,7 +71,14 @@ public class HDFSSequenceFile implements HDFSWriter {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
-
+    if(useRawLocalFileSystem) {
+      if(hdfs instanceof LocalFileSystem) {
+        hdfs = ((LocalFileSystem)hdfs).getRaw();
+      } else {
+        logger.warn("useRawLocalFileSystem is set to true but file system " +
+            "is not of type LocalFileSystem: " + hdfs.getClass().getName());
+      }
+    }
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
             (dstPath)) {
       FSDataOutputStream outStream = hdfs.append(dstPath);

http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 1035ac3..cdddd50 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -112,8 +112,17 @@ public class TestHDFSEventSink {
   }
 
   @Test
-  public void testTextBatchAppend() throws InterruptedException, LifecycleException,
-      EventDeliveryException, IOException {
+  public void testTextBatchAppend() throws Exception {
+    doTestTextBatchAppend(false);
+  }
+
+  @Test
+  public void testTextBatchAppendRawFS() throws Exception {
+    doTestTextBatchAppend(true);
+  }
+
+  public void doTestTextBatchAppend(boolean useRawLocalFileSystem)
+      throws Exception {
     LOG.debug("Starting...");
 
     final long rollCount = 10;
@@ -140,6 +149,8 @@ public class TestHDFSEventSink {
     context.put("hdfs.rollSize", "0");
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.writeFormat", "Text");
+    context.put("hdfs.useRawLocalFileSystem",
+        Boolean.toString(useRawLocalFileSystem));
     context.put("hdfs.fileType", "DataStream");
 
     Configurables.configure(sink, context);
@@ -154,7 +165,7 @@ public class TestHDFSEventSink {
     List<String> bodies = Lists.newArrayList();
 
     // push the event batches into channel to roll twice
-    for (i = 1; i <= rollCount*2/batchSize; i++) {
+    for (i = 1; i <= (rollCount*10)/batchSize; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
       for (j = 1; j <= batchSize; j++) {

http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
new file mode 100644
index 0000000..ffbdde0
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Clock;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class TestUseRawLocalFileSystem {
+
+  private static Logger logger =
+      LoggerFactory.getLogger(TestUseRawLocalFileSystem.class);
+  private Context context;
+
+  private File baseDir;
+  private File testFile;
+  private Event event;
+
+  @Before
+  public void setup() throws Exception {
+    baseDir = Files.createTempDir();
+    testFile = new File(baseDir.getAbsoluteFile(), "test");
+    context = new Context();
+    event = EventBuilder.withBody("test", Charsets.UTF_8);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    FileUtils.deleteQuietly(baseDir);
+  }
+
+  @Test
+  public void testTestFile() throws Exception {
+    String file = testFile.getCanonicalPath();
+    HDFSDataStream stream = new HDFSDataStream();
+    context.put("hdfs.useRawLocalFileSystem", "true");
+    stream.configure(context);
+    stream.open(file);
+    stream.append(event);
+    stream.sync();
+    Assert.assertTrue(testFile.length() > 0);
+  }
+  @Test
+  public void testCompressedFile() throws Exception {
+    String file = testFile.getCanonicalPath();
+    HDFSCompressedDataStream stream = new HDFSCompressedDataStream();
+    context.put("hdfs.useRawLocalFileSystem", "true");
+    stream.configure(context);
+    stream.open(file, new GzipCodec(), CompressionType.RECORD);
+    stream.append(event);
+    stream.sync();
+    Assert.assertTrue(testFile.length() > 0);
+  }
+  @Test
+  public void testSequenceFile() throws Exception {
+    String file = testFile.getCanonicalPath();
+    HDFSSequenceFile stream = new HDFSSequenceFile();
+    context.put("hdfs.useRawLocalFileSystem", "true");
+    stream.configure(context);
+    stream.open(file);
+    stream.append(event);
+    stream.sync();
+    Assert.assertTrue(testFile.length() > 0);
+  }
+}
\ No newline at end of file