You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/26 05:31:59 UTC

svn commit: r1305207 - /incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java

Author: arvind
Date: Mon Mar 26 03:31:59 2012
New Revision: 1305207

URL: http://svn.apache.org/viewvc?rev=1305207&view=rev
Log:
FLUME-881. HDFS Sink to select writableFormat based on file type.

(Jarek Jarcec Cecho via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1305207&r1=1305206&r2=1305207&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Mon Mar 26 03:31:59 2012
@@ -66,7 +66,6 @@ public class HDFSEventSink extends Abstr
   static final long defaultTxnEventMax = 100;
   static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   static final int defaultMaxOpenFiles = 5000;
-  static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
   /**
    * Default length of time we wait for an append
    * before closing the file and moving on.
@@ -164,7 +163,7 @@ public class HDFSEventSink extends Abstr
     String codecName = context.getString("hdfs.codeC");
     fileType = context.getString("hdfs.fileType", defaultFileType);
     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
-    writeFormat = context.getString("hdfs.writeFormat", defaultWriteFormat);
+    writeFormat = context.getString("hdfs.writeFormat");
     appendTimeout = context.getLong("hdfs.appendTimeout", defaultAppendTimeout);
     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
     threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", defaultThreadPoolSize);
@@ -184,6 +183,18 @@ public class HDFSEventSink extends Abstr
       compType = CompressionType.BLOCK;
     }
 
+    if (writeFormat == null) {
+      // Default write formatter is chosen by requested file type
+      if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
+         || fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
+        // Output is written into text files, by default separate events by \n
+        this.writeFormat = HDFSFormatterFactory.hdfsTextFormat;
+      } else {
+        // Output is written into binary files, so use binary writable format
+        this.writeFormat = HDFSFormatterFactory.hdfsWritableFormat;
+      }
+    }
+
     boolean useSec = isHadoopSecurityEnabled();
     LOG.info("Hadoop Security enabled: " + useSec);