You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/26 05:31:59 UTC
svn commit: r1305207 -
/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Author: arvind
Date: Mon Mar 26 03:31:59 2012
New Revision: 1305207
URL: http://svn.apache.org/viewvc?rev=1305207&view=rev
Log:
FLUME-881. HDFS Sink to select writableFormat based on file type.
(Jarek Jarcec Cecho via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1305207&r1=1305206&r2=1305207&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Mon Mar 26 03:31:59 2012
@@ -66,7 +66,6 @@ public class HDFSEventSink extends Abstr
static final long defaultTxnEventMax = 100;
static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
static final int defaultMaxOpenFiles = 5000;
- static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
/**
* Default length of time we wait for an append
* before closing the file and moving on.
@@ -164,7 +163,7 @@ public class HDFSEventSink extends Abstr
String codecName = context.getString("hdfs.codeC");
fileType = context.getString("hdfs.fileType", defaultFileType);
maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
- writeFormat = context.getString("hdfs.writeFormat", defaultWriteFormat);
+ writeFormat = context.getString("hdfs.writeFormat");
appendTimeout = context.getLong("hdfs.appendTimeout", defaultAppendTimeout);
callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", defaultThreadPoolSize);
@@ -184,6 +183,18 @@ public class HDFSEventSink extends Abstr
compType = CompressionType.BLOCK;
}
+ if (writeFormat == null) {
+ // Default write formatter is chosen by requested file type
+ if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
+ || fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
+ // Output is written into text files, by default separate events by \n
+ this.writeFormat = HDFSFormatterFactory.hdfsTextFormat;
+ } else {
+ // Output is written into binary files, so use binary writable format
+ this.writeFormat = HDFSFormatterFactory.hdfsWritableFormat;
+ }
+ }
+
boolean useSec = isHadoopSecurityEnabled();
LOG.info("Hadoop Security enabled: " + useSec);