You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/05 00:26:50 UTC

svn commit: r1178987 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/formatter/output/ main/java/org/apache/flume/sink/ main/java/org/apache/flume/sink/hdfs/ test/java/org/apache/flume/sink/hdfs/

Author: esammer
Date: Tue Oct  4 22:26:50 2011
New Revision: 1178987

URL: http://svn.apache.org/viewvc?rev=1178987&view=rev
Log:
- Added initial implementation of an HDFS sink. (Contributed by Prasad Mujumdar prasadm@cloudera.com)

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+
+public class BucketPath {
+
+  /**
+   * These are useful to other classes which might want to search for tags in
+   * strings.
+   */
+  final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}";
+  final public static Pattern tagPattern = Pattern.compile(TAG_REGEX);
+
+  /**
+   * Returns true if in contains a substring matching TAG_REGEX (i.e. of the
+   * form %{...} or %x.
+   */
+  public static boolean containsTag(String in) {
+    return tagPattern.matcher(in).find();
+  }
+
+  public static String expandShorthand(char c) {
+    // It's a date
+    switch (c) {
+    case 'a':
+      return "weekday_short";
+    case 'A':
+      return "weekday_full";
+    case 'b':
+      return "monthname_short";
+    case 'B':
+      return "monthname_full";
+    case 'c':
+      return "datetime";
+    case 'd':
+      return "day_of_month_xx"; // two digit
+    case 'D':
+      return "date_short"; // "MM/dd/yy";
+    case 'H':
+      return "hour_24_xx";
+    case 'I':
+      return "hour_12_xx";
+    case 'j':
+      return "day_of_year_xxx"; // three digits
+    case 'k':
+      return "hour_24"; // 1 or 2 digits
+    case 'l':
+      return "hour_12"; // 1 or 2 digits
+    case 'm':
+      return "month_xx";
+    case 'M':
+      return "minute_xx";
+    case 'p':
+      return "am_pm";
+    case 's':
+      return "unix_seconds";
+    case 'S':
+      return "seconds_xx";
+    case 't':
+      // This is different from unix date (which would insert a tab character
+      // here)
+      return "unix_millis";
+    case 'y':
+      return "year_xx";
+    case 'Y':
+      return "year_xxxx";
+    case 'z':
+      return "timezone_delta";
+    default:
+//      LOG.warn("Unrecognized escape in event format string: %" + c);
+      return "" + c;
+    }
+
+  }
+
+  /**
+   * Hardcoded lookups for %x style escape replacement. Add your own!
+   * 
+   * All shorthands are Date format strings, currently.
+   * 
+   * Returns the empty string if an escape is not recognized.
+   * 
+   * Dates follow the same format as unix date, with a few exceptions.
+   * 
+   */
+  public static String replaceShorthand(char c, Map<String, String> headers) {
+    // It's a date
+    String formatString = "";
+    switch (c) {
+    case '%':
+      return "%";
+    case 'a':
+      formatString = "EEE";
+      break;
+    case 'A':
+      formatString = "EEEE";
+      break;
+    case 'b':
+      formatString = "MMM";
+      break;
+    case 'B':
+      formatString = "MMMM";
+      break;
+    case 'c':
+      formatString = "EEE MMM d HH:mm:ss yyyy";
+      break;
+    case 'd':
+      formatString = "dd";
+      break;
+    case 'D':
+      formatString = "MM/dd/yy";
+      break;
+    case 'H':
+      formatString = "HH";
+      break;
+    case 'I':
+      formatString = "hh";
+      break;
+    case 'j':
+      formatString = "DDD";
+      break;
+    case 'k':
+      formatString = "H";
+      break;
+    case 'l':
+      formatString = "h";
+      break;
+    case 'm':
+      formatString = "MM";
+      break;
+    case 'M':
+      formatString = "mm";
+      break;
+    case 'p':
+      formatString = "a";
+      break;
+    case 's':
+      return "" + (Long.valueOf(headers.get("timestamp"))/ 1000);
+    case 'S':
+      formatString = "ss";
+      break;
+    case 't':
+      // This is different from unix date (which would insert a tab character
+      // here)
+      return headers.get("timestamp");
+    case 'y':
+      formatString = "yy";
+      break;
+    case 'Y':
+      formatString = "yyyy";
+      break;
+    case 'z':
+      formatString = "ZZZ";
+      break;
+    default:
+//      LOG.warn("Unrecognized escape in event format string: %" + c);
+      return "";
+    }
+    SimpleDateFormat format = new SimpleDateFormat(formatString);
+    Date date = new Date(Long.valueOf(headers.get("timestamp")));
+    return format.format(date);
+  }
+
+  /**
+   * Replace all substrings of form %{tagname} with get(tagname).toString() and
+   * all shorthand substrings of form %x with a special value.
+   * 
+   * Any unrecognized / not found tags will be replaced with the empty string.
+   * 
+   * TODO(henry): we may want to consider taking this out of Event and into a
+   * more general class when we get more use cases for this pattern.
+   */
+  public static String escapeString(String in, Map<String, String> headers) {
+    Matcher matcher = tagPattern.matcher(in);
+    StringBuffer sb = new StringBuffer();
+    while (matcher.find()) {
+      String replacement = "";
+      // Group 2 is the %{...} pattern
+      if (matcher.group(2) != null) {
+
+        replacement = headers.get(matcher.group(2));
+        if (replacement == null) {
+          replacement = "";
+//          LOG.warn("Tag " + matcher.group(2) + " not found");
+        }
+      } else {
+        // The %x pattern.
+        // Since we know the match is a single character, we can
+        // switch on that rather than the string.
+        Preconditions.checkState(matcher.group(1) != null
+            && matcher.group(1).length() == 1,
+            "Expected to match single character tag in string " + in);
+        char c = matcher.group(1).charAt(0);
+        replacement = replaceShorthand(c, headers);
+      }
+
+      // The replacement string must have '$' and '\' chars escaped. This
+      // replacement string is pretty arcane.
+      //
+      // replacee : '$' -> for java '\$' -> for regex "\\$"
+      // replacement: '\$' -> for regex '\\\$' -> for java "\\\\\\$"
+      //
+      // replacee : '\' -> for java "\\" -> for regex "\\\\"
+      // replacement: '\\' -> for regex "\\\\" -> for java "\\\\\\\\"
+
+      // note: order matters
+      replacement = replacement.replaceAll("\\\\", "\\\\\\\\");
+      replacement = replacement.replaceAll("\\$", "\\\\\\$");
+
+      matcher.appendReplacement(sb, replacement);
+    }
+    matcher.appendTail(sb);
+    return sb.toString();
+  }
+
+  /**
+   * Instead of replacing escape sequences in a string, this method returns a
+   * mapping of an attribute name to the value based on the escape sequence
+   * found in the argument string.
+   */
+  public static Map<String, String> getEscapeMapping(String in, Map<String, String> headers) {
+    Map<String, String> mapping = new HashMap<String, String>();
+    Matcher matcher = tagPattern.matcher(in);
+    while (matcher.find()) {
+      String replacement = "";
+      // Group 2 is the %{...} pattern
+      if (matcher.group(2) != null) {
+
+        replacement = headers.get(matcher.group(2));
+
+        if (replacement == null) {
+          replacement = "";
+//          LOG.warn("Tag " + matcher.group(2) + " not found");
+        }
+        mapping.put(matcher.group(2), replacement);
+      } else {
+        // The %x pattern.
+        // Since we know the match is a single character, we can
+        // switch on that rather than the string.
+        Preconditions.checkState(matcher.group(1) != null
+            && matcher.group(1).length() == 1,
+            "Expected to match single character tag in string " + in);
+        char c = matcher.group(1).charAt(0);
+        replacement = replaceShorthand(c, headers);
+        mapping.put(expandShorthand(c), replacement);
+      }
+    }
+    return mapping;
+
+  }
+}
+

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink;
+
+import org.apache.flume.Event;
+
+public interface FlumeFormatter {
+
+  @SuppressWarnings("rawtypes")
+  Class getKeyClass();
+
+  @SuppressWarnings("rawtypes")
+  Class getValueClass();
+
+  Object getKey(Event e);
+
+  Object getValue(Event e);
+
+  byte[] getBytes(Event e);
+
+}
\ No newline at end of file

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class BucketWriter {
+
+  public static enum BucketFlushStatus {
+    BatchStarted, BatchPending, BatchFlushed
+  }
+
+  private HDFSWriter writer;
+  private FlumeFormatter formatter;
+  private long eventCounter;
+  private long processSize;
+  private long lastProcessTime;
+  private long fileExentionCounter;
+  private long batchCounter;
+  private String filePath;
+  private long rollInterval;
+  private long rollSize;
+  private long rollCount;
+  private long batchSize;
+  private CompressionCodec codeC;
+  private CompressionType compType;
+
+  // clear the class counters
+  private void resetCounters() {
+    eventCounter = 0;
+    processSize = 0;
+    lastProcessTime = 0;
+    batchCounter = 0;
+  }
+
+  // constructor. initialize the thresholds and open the file handle
+  public BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize)
+      throws IOException {
+    rollInterval = rollInt;
+    rollSize = rollSz;
+    rollCount = rollCnt;
+    batchSize = bSize;
+
+    resetCounters();
+    fileExentionCounter = 0;
+    // open();
+  }
+
+  public void open() throws IOException {
+    if ((filePath == null) || (writer == null) || (formatter == null)) {
+      throw new IOException("Invalid file settings");
+    }
+
+    String bucketPath = filePath + "." + fileExentionCounter;
+    if (codeC == null) {
+      writer.open(bucketPath, formatter);
+    } else {
+      writer.open(bucketPath, codeC, compType, formatter);
+    }
+  }
+
+  public void open(String fPath, HDFSWriter hWriter, FlumeFormatter fmt)
+      throws IOException {
+    open(fPath, null, CompressionType.NONE, hWriter, fmt);
+  }
+
+  public void open(String fPath, CompressionCodec codec, CompressionType cType,
+      HDFSWriter hWriter, FlumeFormatter fmt) throws IOException {
+    filePath = fPath;
+    codeC = codec;
+    compType = cType;
+    writer = hWriter;
+    formatter = fmt;
+    open();
+  }
+
+  // close the file handle
+  public void close() throws IOException {
+    resetCounters();
+    if (writer != null) {
+      writer.close();
+      fileExentionCounter++;
+    }
+  }
+
+  // close the file, ignore the IOException
+  public void abort() {
+    try {
+      close();
+    } catch (IOException eIO) {
+      // Ignore it
+    }
+  }
+
+  // flush the data
+  public void flush() throws IOException {
+    writer.sync();
+    batchCounter = 0;
+  }
+
+  // handle the batching, do the real flush if its time
+  public BucketFlushStatus sync() throws IOException {
+    BucketFlushStatus syncStatus;
+
+    if ((batchCounter == batchSize)) {
+      flush();
+      syncStatus = BucketFlushStatus.BatchFlushed;
+    } else {
+      if (batchCounter == 1) {
+        syncStatus = BucketFlushStatus.BatchStarted;
+      } else {
+        syncStatus = BucketFlushStatus.BatchPending;
+      }
+    }
+    return syncStatus;
+  }
+
+  // append the data, update stats, handle roll and batching
+  public BucketFlushStatus append(Event e) throws IOException {
+    BucketFlushStatus syncStatus;
+
+    writer.append(e, formatter);
+
+    // update statistics
+    processSize += e.getBody().length;
+    lastProcessTime = System.currentTimeMillis() * 1000;
+    eventCounter++;
+    batchCounter++;
+
+    // check if its time to rotate the file
+    if (shouldRotate()) {
+      close();
+      open();
+      syncStatus = BucketFlushStatus.BatchFlushed;
+    } else {
+      syncStatus = sync();
+    }
+
+    return syncStatus;
+  }
+
+  // check if time to rotate the file
+  public boolean shouldRotate() {
+    boolean doRotate = false;
+
+    if ((rollInterval > 0)
+        && (rollInterval < (System.currentTimeMillis() - lastProcessTime) / 1000))
+      doRotate = true;
+    if ((rollCount > 0) && (rollCount < eventCounter)) {
+      eventCounter = 0;
+      doRotate = true;
+    }
+    if ((rollSize > 0) && (rollSize < processSize)) {
+      processSize = 0;
+      doRotate = true;
+    }
+
+    return doRotate;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+public class HDFSCompressedDataStream implements HDFSWriter {
+
+  private CompressionOutputStream outStream;
+
+  @Override
+  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    DefaultCodec defCodec = new DefaultCodec();
+    CompressionType cType = CompressionType.BLOCK;
+    open(filePath, defCodec, cType, fmt);
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codec,
+      CompressionType cType, FlumeFormatter fmt) throws IOException {
+
+    FSDataOutputStream fsOutStream;
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    if (conf.getBoolean("hdfs.append.support", false) == true) {
+      fsOutStream = hdfs.append(dstPath);
+    } else {
+      fsOutStream = hdfs.create(dstPath);
+    }
+    outStream = codec.createOutputStream(fsOutStream);
+  }
+
+  @Override
+  public void append(Event e, FlumeFormatter fmt) throws IOException {
+    byte[] bValue = fmt.getBytes(e);
+    outStream.write(bValue, 0, bValue.length);
+  }
+
+  @Override
+  public void sync() throws IOException {
+    outStream.finish();
+  }
+
+  @Override
+  public void close() throws IOException {
+    outStream.close();
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSDataStream implements HDFSWriter {
+  private FSDataOutputStream outStream;
+
+  @Override
+  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    if (conf.getBoolean("hdfs.append.support", false) == true) {
+      outStream = hdfs.append(dstPath);
+    } else {
+      outStream = hdfs.create(dstPath);
+    }
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codec,
+      CompressionType cType, FlumeFormatter fmt) throws IOException {
+    open(filePath, fmt);
+  }
+
+  @Override
+  public void append(Event e, FlumeFormatter fmt) throws IOException {
+    byte[] bValue = fmt.getBytes(e);
+    outStream.write(bValue, 0, bValue.length);
+  }
+
+  @Override
+  public void sync() throws IOException {
+    outStream.sync();
+  }
+
+  @Override
+  public void close() throws IOException {
+    outStream.close();
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.flume.sink.hdfs.BucketWriter.BucketFlushStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HDFSEventSink extends AbstractSink implements PollableSink,
+    Configurable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HDFSEventSink.class);
+
+  static final long defaultRollInterval = 30;
+  static final long defaultRollSize = 1024;
+  static final long defaultRollCount = 10;
+  static final String defaultFileName = "FlumeData";
+  static final String defaultBucketFormat = "%yyyy-%mm-%dd/%HH";
+  static final long defaultBatchSize = 1;
+  static final long defaultTxnEventMax = 100;
+  static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
+  static final int defaultMaxOpenFiles = 5000;
+  static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
+
+  private long rollInterval;
+  private long rollSize;
+  private long rollCount;
+  private long txnEventMax;
+  private long batchSize;
+  private CompressionCodec codeC;
+  private CompressionType compType;
+  private String fileType;
+  private String path;
+  private int maxOpenFiles;
+  String writeFormat;
+
+  /*
+   * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
+   * the oldest file handle if there are too many open ones
+   */
+  private class writerLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
+    private static final long serialVersionUID = 1L;
+
+    protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
+      if (super.size() > maxOpenFiles) {
+        // If we have more that max open files, then close the last one and
+        // return true
+        try {
+          eldest.getValue().close();
+        } catch (IOException eI) {
+          LOG.warn(eldest.getKey().toString(), eI);
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  final writerLinkedHashMap sfWriters = new writerLinkedHashMap();
+
+  // Used to short-circuit around doing regex matches when we know there are
+  // no templates to be replaced.
+  // private boolean shouldSub = false;
+
+  public HDFSEventSink() {
+
+  }
+
+  // read configuration and setup thresholds
+  @Override
+  public void configure(Context context) {
+    String dirpath = context.get("hdfs.path", String.class);
+    String fileName = context.get("hdfs.filePrefix", String.class);
+    String rollInterval = context.get("hdfs.rollInterval", String.class);
+    String rollSize = context.get("hdfs.rollSize", String.class);
+    String rollCount = context.get("hdfs.rollCount", String.class);
+    String batchSize = context.get("hdfs.batchSize", String.class);
+    String txnEventMax = context.get("hdfs.txnEventMax", String.class);
+    String codecName = context.get("hdfs.codeC", String.class);
+    String fileType = context.get("hdfs.fileType", String.class);
+    String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
+    String writeFormat = context.get("hdfs.writeFormat", String.class);
+
+    if (fileName == null)
+      fileName = defaultFileName;
+    this.path = new String(dirpath + "/" + fileName);
+
+    if (rollInterval == null) {
+      this.rollInterval = defaultRollInterval;
+    } else {
+      this.rollInterval = Long.parseLong(rollInterval);
+    }
+
+    if (rollSize == null) {
+      this.rollSize = defaultRollSize;
+    } else {
+      this.rollSize = Long.parseLong(rollSize);
+    }
+
+    if (rollCount == null) {
+      this.rollCount = defaultRollCount;
+    } else {
+      this.rollCount = Long.parseLong(rollCount);
+    }
+
+    if ((batchSize == null) || batchSize.equals("0")) {
+      this.batchSize = defaultBatchSize;
+    } else {
+      this.batchSize = Long.parseLong(batchSize);
+    }
+
+    if ((txnEventMax == null) || txnEventMax.equals("0")) {
+      this.txnEventMax = defaultTxnEventMax;
+    } else {
+      this.txnEventMax = Long.parseLong(txnEventMax);
+    }
+
+    if (codecName == null) {
+      codeC = null;
+      compType = CompressionType.NONE;
+    } else {
+      codeC = getCodec(codecName);
+      // TODO : set proper compression type
+      compType = CompressionType.BLOCK;
+    }
+
+    if (fileType == null) {
+      this.fileType = defaultFileType;
+    } else {
+      this.fileType = fileType;
+    }
+
+    if (maxOpenFiles == null) {
+      this.maxOpenFiles = defaultMaxOpenFiles;
+    } else {
+      this.maxOpenFiles = Integer.parseInt(maxOpenFiles);
+    }
+
+    if (writeFormat == null) {
+      this.writeFormat = defaultWriteFormat;
+    } else {
+      this.writeFormat = writeFormat;
+    }
+  }
+
+  private static boolean codecMatches(Class<? extends CompressionCodec> cls,
+      String codecName) {
+    String simpleName = cls.getSimpleName();
+    if (cls.getName().equals(codecName)
+        || simpleName.equalsIgnoreCase(codecName)) {
+      return true;
+    }
+    if (simpleName.endsWith("Codec")) {
+      String prefix = simpleName.substring(0,
+          simpleName.length() - "Codec".length());
+      if (prefix.equalsIgnoreCase(codecName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static CompressionCodec getCodec(String codecName) {
+    Configuration conf = new Configuration();
+    List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
+        .getCodecClasses(conf);
+    // Wish we could base this on DefaultCodec but appears not all codec's
+    // extend DefaultCodec(Lzo)
+    CompressionCodec codec = null;
+    ArrayList<String> codecStrs = new ArrayList<String>();
+    codecStrs.add("None");
+    for (Class<? extends CompressionCodec> cls : codecs) {
+      codecStrs.add(cls.getSimpleName());
+
+      if (codecMatches(cls, codecName)) {
+        try {
+          codec = cls.newInstance();
+        } catch (InstantiationException e) {
+          LOG.error("Unable to instantiate " + cls + " class");
+        } catch (IllegalAccessException e) {
+          LOG.error("Unable to access " + cls + " class");
+        }
+      }
+    }
+
+    if (codec == null) {
+      if (!codecName.equalsIgnoreCase("None")) {
+        throw new IllegalArgumentException("Unsupported compression codec "
+            + codecName + ".  Please choose from: " + codecStrs);
+      }
+    } else if (codec instanceof org.apache.hadoop.conf.Configurable) {
+      // Must check instanceof codec as BZip2Codec doesn't inherit Configurable
+      // Must set the configuration for Configurable objects that may or do use
+      // native libs
+      ((org.apache.hadoop.conf.Configurable) codec).setConf(conf);
+    }
+    return codec;
+  }
+
+  /**
+   * Pull events out of channel and send it to HDFS - take at the most
+   * txnEventMax, that's the maximum #events to hold in channel for a given
+   * transaction - find the corresponding bucket for the event, ensure the file
+   * is open - extract the pay-load and append to HDFS file
+   */
+  @Override
+  public Status process() throws EventDeliveryException {
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+    Map<String, BucketWriter> batchMap = new HashMap<String, BucketWriter>();
+    BucketFlushStatus syncedUp;
+
+    try {
+      transaction.begin();
+      for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
+        Event event = channel.take();
+        if (event == null)
+          break;
+
+        // reconstruct the path name by substituting place holders
+        String realPath = BucketPath.escapeString(path, event.getHeaders());
+        BucketWriter bw = sfWriters.get(realPath);
+
+        // we haven't seen this file yet, so open it and cache the handle
+        if (bw == null) {
+          HDFSWriter writer = HDFSWriterFactory.getWriter(fileType);
+          FlumeFormatter formatter = HDFSFormatterFactory
+              .getFormatter(writeFormat);
+          bw = new BucketWriter(rollInterval, rollSize, rollCount, batchSize);
+          bw.open(realPath, codeC, compType, writer, formatter);
+          sfWriters.put(realPath, bw);
+        }
+
+        // Write the data to HDFS
+        syncedUp = bw.append(event);
+
+        // keep track of the files in current batch that are not flushed
+        // we need to flush all those at the end of the transaction
+        if (syncedUp == BucketFlushStatus.BatchStarted)
+          batchMap.put(bw.getFilePath(), bw);
+        else if ((batchSize > 1)
+            && (syncedUp == BucketFlushStatus.BatchFlushed))
+          batchMap.remove(bw.getFilePath());
+      }
+
+      // flush any pending writes in the given transaction
+      for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
+        e.getValue().flush();
+      }
+      batchMap.clear();
+      transaction.commit();
+      return Status.READY;
+    } catch (IOException eIO) {
+      transaction.rollback();
+      LOG.error("HDFS IO error", eIO);
+      return Status.BACKOFF;
+    } catch (Exception e) {
+      transaction.rollback();
+      LOG.error("process failed", e);
+      throw new EventDeliveryException(e.getMessage());
+    } finally {
+      // clear any leftover writes in the given transaction
+      for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
+        e.getValue().abort();
+      }
+      transaction.close();
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
+        LOG.info("Closing " + e.getKey());
+        e.getValue().close();
+      }
+    } catch (IOException eIO) {
+      LOG.warn("IOException in opening file", eIO);
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
+      try {
+        e.getValue().open();
+      } catch (IOException eIO) {
+        LOG.warn("IOException in opening file", eIO);
+      }
+    }
+    super.start();
+  }
+
+}
\ No newline at end of file

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.sink.FlumeFormatter;
+
+public class HDFSFormatterFactory {
+
+  HDFSFormatterFactory() {
+
+  }
+
+  static final String hdfsWritableFormat = "Writable";
+  static final String hdfsAvroFormat = "Avro";
+
+  static FlumeFormatter getFormatter(String formatType) throws IOException {
+    if (formatType == hdfsWritableFormat)
+      return new HDFSWritableFormatter();
+    else
+      throw new IOException("Incorrect formatter type");
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSSequenceFile implements HDFSWriter {
+
+  private SequenceFile.Writer writer;
+
+  public HDFSSequenceFile() {
+    writer = null;
+  }
+
+  @Override
+  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    open(filePath, null, CompressionType.NONE, fmt);
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codeC,
+      CompressionType compType, FlumeFormatter fmt) throws IOException {
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    if (conf.getBoolean("hdfs.append.support", false) == true) {
+      FSDataOutputStream outStream = hdfs.append(dstPath);
+      writer = SequenceFile.createWriter(conf, outStream, fmt.getKeyClass(),
+          fmt.getValueClass(), compType, codeC);
+    } else {
+      writer = SequenceFile.createWriter(hdfs, conf, dstPath,
+          fmt.getKeyClass(), fmt.getValueClass(), compType, codeC);
+    }
+  }
+
+  @Override
+  public void append(Event e, FlumeFormatter formatter) throws IOException {
+    writer.append(formatter.getKey(e), formatter.getValue(e));
+  }
+
+  @Override
+  public void sync() throws IOException {
+    writer.syncFs();
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+    writer = null;
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+public class HDFSWritableFormatter implements FlumeFormatter {
+
+  private BytesWritable makeByteWritable(Event e) {
+    BytesWritable bytesObject = new BytesWritable();
+    bytesObject.set(e.getBody(), 0, e.getBody().length);
+    return bytesObject;
+  }
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<BytesWritable> getValueClass() {
+    return BytesWritable.class;
+  }
+
+  @Override
+  public Object getKey(Event e) {
+    // Write the data to HDFS
+    String timestamp = e.getHeaders().get("timestamp");
+    long eventStamp;
+
+    if (timestamp == null) {
+      eventStamp = System.currentTimeMillis();
+    } else {
+      eventStamp = Long.valueOf(timestamp);
+    }
+    LongWritable longObject = new LongWritable(eventStamp);
+    return longObject;
+  }
+
+  @Override
+  public Object getValue(Event e) {
+    return makeByteWritable(e);
+  }
+
+  @Override
+  public byte[] getBytes(Event e) {
+    return makeByteWritable(e).getBytes();
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public interface HDFSWriter {
+
+  public void open(String filePath, FlumeFormatter fmt) throws IOException;
+
+  // public void open(String filePath, CompressionCodec codec, CompressionType
+  // cType) throws IOException;
+
+  public void open(String filePath, CompressionCodec codec,
+      CompressionType cType, FlumeFormatter fmt) throws IOException;
+
+  // public void append(long key, byte [] val) throws IOException;
+
+  public void append(Event e, FlumeFormatter fmt) throws IOException;
+
+  public void sync() throws IOException;
+
+  public void close() throws IOException;
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+abstract class HDFSWriterFactory {
+  static final String SequenceFileType = "SequenceFile";
+  static final String DataStreamType = "DataStream";
+  static final String CompStreamType = "CompressedStream";
+
+  public HDFSWriterFactory() {
+
+  }
+
+  public static HDFSWriter getWriter(String fileType) throws IOException {
+    if (fileType == SequenceFileType) {
+      return new HDFSSequenceFile();
+    } else if (fileType == DataStreamType) {
+      return new HDFSDataStream();
+    } else if (fileType == CompStreamType) {
+      return new HDFSCompressedDataStream();
+    } else {
+      throw new IOException("File type " + fileType + " not supported");
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Tue Oct  4 22:26:50 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.Calendar;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.sink.hdfs.HDFSEventSink;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHDFSEventSink {
+
+  private HDFSEventSink sink;
+  private String testPath;
+
+  @Before
+  public void setUp() {
+    /*
+     * FIXME: Use a dynamic path to support concurrent test execution. Also,
+     * beware of the case where this path is used for something or when the
+     * Hadoop config points at file:/// rather than hdfs://. We need to find a
+     * better way of testing HDFS related functionality.
+     */
+    testPath = "/user/flume/testdata";
+    sink = new HDFSEventSink();
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException, LifecycleException {
+    Context context = new Context();
+
+    context.put("hdfs.path", testPath);
+    /*
+     * context.put("hdfs.rollInterval", String.class);
+     * context.get("hdfs.rollSize", String.class); context.get("hdfs.rollCount",
+     * String.class);
+     */
+    Configurables.configure(sink, context);
+
+    sink.start();
+    sink.stop();
+  }
+
+  @Test
+  public void testAppend() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    final long txnMax = 25;
+    final long rollCount = 3;
+    final long batchSize = 2;
+    final String fileName = "FlumeData";
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(testPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction txn = channel.getTransaction();
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (int i = 1; i < 4; i++) {
+      txn.begin();
+      for (int j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+
+        event.setBody(("Test." + i + "." + j).getBytes());
+        channel.put(event);
+      }
+      txn.commit();
+
+      // execute sink to process the events
+      sink.process();
+    }
+
+    sink.stop();
+
+    /*
+     * 
+     * // loop through all the files generated and check their contains
+     * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
+     * FileUtil.stat2Paths(dirStat);
+     * 
+     * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
+     * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
+     * = new LongWritable(); BytesWritable value = new BytesWritable();
+     * 
+     * while (reader.next(key, value)) { logger.info(key+ ":" +
+     * value.toString()); } reader.close(); } } catch (IOException ioe) {
+     * System.err.println("IOException during operation: " + ioe.toString());
+     * System.exit(1); }
+     */
+  }
+}