You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/01/16 02:42:56 UTC
git commit: FLUME-1844. HDFSEventSink should have option to use
RawLocalFileSystem.
Updated Branches:
refs/heads/trunk 750809c70 -> 118752374
FLUME-1844. HDFSEventSink should have option to use RawLocalFileSystem.
(Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/11875237
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/11875237
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/11875237
Branch: refs/heads/trunk
Commit: 11875237420eef3e47ad84e7998e2caff6fc2de6
Parents: 750809c
Author: Hari Shreedharan <ha...@gmail.com>
Authored: Tue Jan 15 17:31:51 2013 -0800
Committer: Hari Shreedharan <ha...@gmail.com>
Committed: Tue Jan 15 17:42:41 2013 -0800
----------------------------------------------------------------------
.../org/apache/flume/sink/hdfs/BucketWriter.java | 3 +-
.../flume/sink/hdfs/HDFSCompressedDataStream.java | 15 ++
.../org/apache/flume/sink/hdfs/HDFSDataStream.java | 21 +++-
.../apache/flume/sink/hdfs/HDFSSequenceFile.java | 19 +++-
.../apache/flume/sink/hdfs/TestHDFSEventSink.java | 17 ++-
.../flume/sink/hdfs/TestUseRawLocalFileSystem.java | 105 +++++++++++++++
6 files changed, 174 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index f14f7cb..3f31ef2 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -34,6 +34,7 @@ import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -128,7 +129,7 @@ class BucketWriter {
fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
isOpen = false;
- writer.configure(context);
+ this.writer.configure(context);
}
/**
http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 18fe6d4..95eb252 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -26,6 +26,7 @@ import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -46,12 +47,17 @@ public class HDFSCompressedDataStream implements HDFSWriter {
private String serializerType;
private Context serializerContext;
private EventSerializer serializer;
+ private boolean useRawLocalFileSystem;
@Override
public void configure(Context context) {
serializerType = context.getString("serializer", "TEXT");
+ useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
+ false);
serializerContext = new Context(
context.getSubProperties(EventSerializer.CTX_PREFIX));
+ logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ + useRawLocalFileSystem);
}
@Override
@@ -67,6 +73,14 @@ public class HDFSCompressedDataStream implements HDFSWriter {
Configuration conf = new Configuration();
Path dstPath = new Path(filePath);
FileSystem hdfs = dstPath.getFileSystem(conf);
+ if(useRawLocalFileSystem) {
+ if(hdfs instanceof LocalFileSystem) {
+ hdfs = ((LocalFileSystem)hdfs).getRaw();
+ } else {
+ logger.warn("useRawLocalFileSystem is set to true but file system " +
+ "is not of type LocalFileSystem: " + hdfs.getClass().getName());
+ }
+ }
boolean appending = false;
if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
@@ -76,6 +90,7 @@ public class HDFSCompressedDataStream implements HDFSWriter {
} else {
fsOut = hdfs.create(dstPath);
}
+ System.out.println("fsOut " + fsOut);
cmpOut = codec.createOutputStream(fsOut);
serializer = EventSerializerFactory.getInstance(serializerType,
serializerContext, cmpOut);
http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index bd40a88..04120ec 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -26,22 +26,33 @@ import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HDFSDataStream implements HDFSWriter {
- private FSDataOutputStream outStream;
+ private static final Logger logger =
+ LoggerFactory.getLogger(HDFSSequenceFile.class);
+
+ private FSDataOutputStream outStream;
private String serializerType;
private Context serializerContext;
private EventSerializer serializer;
+ private boolean useRawLocalFileSystem;
@Override
public void configure(Context context) {
serializerType = context.getString("serializer", "TEXT");
+ useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
+ false);
serializerContext =
new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
+ logger.info("Serializer = " + serializerType + ", UseRawLocalFileSystem = "
+ + useRawLocalFileSystem);
}
@Override
@@ -49,6 +60,14 @@ public class HDFSDataStream implements HDFSWriter {
Configuration conf = new Configuration();
Path dstPath = new Path(filePath);
FileSystem hdfs = dstPath.getFileSystem(conf);
+ if(useRawLocalFileSystem) {
+ if(hdfs instanceof LocalFileSystem) {
+ hdfs = ((LocalFileSystem)hdfs).getRaw();
+ } else {
+ logger.warn("useRawLocalFileSystem is set to true but file system " +
+ "is not of type LocalFileSystem: " + hdfs.getClass().getName());
+ }
+ }
boolean appending = false;
if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 1e6d68f..e127f6a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -24,17 +24,23 @@ import org.apache.flume.Event;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HDFSSequenceFile implements HDFSWriter {
+ private static final Logger logger =
+ LoggerFactory.getLogger(HDFSSequenceFile.class);
private SequenceFile.Writer writer;
private String writeFormat;
private Context serializerContext;
private SeqFileFormatter formatter;
+ private boolean useRawLocalFileSystem;
public HDFSSequenceFile() {
writer = null;
@@ -44,10 +50,14 @@ public class HDFSSequenceFile implements HDFSWriter {
public void configure(Context context) {
// use binary writable format by default
writeFormat = context.getString("hdfs.writeFormat", SeqFileFormatterType.Writable.name());
+ useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
+ false);
serializerContext = new Context(
context.getSubProperties(SeqFileFormatterFactory.CTX_PREFIX));
formatter = SeqFileFormatterFactory
.getFormatter(writeFormat, serializerContext);
+ logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = "
+ + useRawLocalFileSystem);
}
@Override
@@ -61,7 +71,14 @@ public class HDFSSequenceFile implements HDFSWriter {
Configuration conf = new Configuration();
Path dstPath = new Path(filePath);
FileSystem hdfs = dstPath.getFileSystem(conf);
-
+ if(useRawLocalFileSystem) {
+ if(hdfs instanceof LocalFileSystem) {
+ hdfs = ((LocalFileSystem)hdfs).getRaw();
+ } else {
+ logger.warn("useRawLocalFileSystem is set to true but file system " +
+ "is not of type LocalFileSystem: " + hdfs.getClass().getName());
+ }
+ }
if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
(dstPath)) {
FSDataOutputStream outStream = hdfs.append(dstPath);
http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 1035ac3..cdddd50 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -112,8 +112,17 @@ public class TestHDFSEventSink {
}
@Test
- public void testTextBatchAppend() throws InterruptedException, LifecycleException,
- EventDeliveryException, IOException {
+ public void testTextBatchAppend() throws Exception {
+ doTestTextBatchAppend(false);
+ }
+
+ @Test
+ public void testTextBatchAppendRawFS() throws Exception {
+ doTestTextBatchAppend(true);
+ }
+
+ public void doTestTextBatchAppend(boolean useRawLocalFileSystem)
+ throws Exception {
LOG.debug("Starting...");
final long rollCount = 10;
@@ -140,6 +149,8 @@ public class TestHDFSEventSink {
context.put("hdfs.rollSize", "0");
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.writeFormat", "Text");
+ context.put("hdfs.useRawLocalFileSystem",
+ Boolean.toString(useRawLocalFileSystem));
context.put("hdfs.fileType", "DataStream");
Configurables.configure(sink, context);
@@ -154,7 +165,7 @@ public class TestHDFSEventSink {
List<String> bodies = Lists.newArrayList();
// push the event batches into channel to roll twice
- for (i = 1; i <= rollCount*2/batchSize; i++) {
+ for (i = 1; i <= (rollCount*10)/batchSize; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
for (j = 1; j <= batchSize; j++) {
http://git-wip-us.apache.org/repos/asf/flume/blob/11875237/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
new file mode 100644
index 0000000..ffbdde0
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Clock;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class TestUseRawLocalFileSystem {
+
+ private static Logger logger =
+ LoggerFactory.getLogger(TestUseRawLocalFileSystem.class);
+ private Context context;
+
+ private File baseDir;
+ private File testFile;
+ private Event event;
+
+ @Before
+ public void setup() throws Exception {
+ baseDir = Files.createTempDir();
+ testFile = new File(baseDir.getAbsoluteFile(), "test");
+ context = new Context();
+ event = EventBuilder.withBody("test", Charsets.UTF_8);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ FileUtils.deleteQuietly(baseDir);
+ }
+
+ @Test
+ public void testTestFile() throws Exception {
+ String file = testFile.getCanonicalPath();
+ HDFSDataStream stream = new HDFSDataStream();
+ context.put("hdfs.useRawLocalFileSystem", "true");
+ stream.configure(context);
+ stream.open(file);
+ stream.append(event);
+ stream.sync();
+ Assert.assertTrue(testFile.length() > 0);
+ }
+ @Test
+ public void testCompressedFile() throws Exception {
+ String file = testFile.getCanonicalPath();
+ HDFSCompressedDataStream stream = new HDFSCompressedDataStream();
+ context.put("hdfs.useRawLocalFileSystem", "true");
+ stream.configure(context);
+ stream.open(file, new GzipCodec(), CompressionType.RECORD);
+ stream.append(event);
+ stream.sync();
+ Assert.assertTrue(testFile.length() > 0);
+ }
+ @Test
+ public void testSequenceFile() throws Exception {
+ String file = testFile.getCanonicalPath();
+ HDFSSequenceFile stream = new HDFSSequenceFile();
+ context.put("hdfs.useRawLocalFileSystem", "true");
+ stream.configure(context);
+ stream.open(file);
+ stream.append(event);
+ stream.sync();
+ Assert.assertTrue(testFile.length() > 0);
+ }
+}
\ No newline at end of file