You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/05 00:26:50 UTC
svn commit: r1178987 - in
/incubator/flume/branches/flume-728/flume-ng-core/src:
main/java/org/apache/flume/formatter/output/
main/java/org/apache/flume/sink/ main/java/org/apache/flume/sink/hdfs/
test/java/org/apache/flume/sink/hdfs/
Author: esammer
Date: Tue Oct 4 22:26:50 2011
New Revision: 1178987
URL: http://svn.apache.org/viewvc?rev=1178987&view=rev
Log:
- Added initial implementation of an HDFS sink. (Contributed by Prasad Mujumdar prasadm@cloudera.com)
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Preconditions;
+
+public class BucketPath {
+
+ /**
+ * These are useful to other classes which might want to search for tags in
+ * strings.
+ */
+ final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}";
+ final public static Pattern tagPattern = Pattern.compile(TAG_REGEX);
+
+ /**
+ * Returns true if in contains a substring matching TAG_REGEX (i.e. of the
+ * form %{...} or %x.
+ */
+ public static boolean containsTag(String in) {
+ return tagPattern.matcher(in).find();
+ }
+
+ public static String expandShorthand(char c) {
+ // It's a date
+ switch (c) {
+ case 'a':
+ return "weekday_short";
+ case 'A':
+ return "weekday_full";
+ case 'b':
+ return "monthname_short";
+ case 'B':
+ return "monthname_full";
+ case 'c':
+ return "datetime";
+ case 'd':
+ return "day_of_month_xx"; // two digit
+ case 'D':
+ return "date_short"; // "MM/dd/yy";
+ case 'H':
+ return "hour_24_xx";
+ case 'I':
+ return "hour_12_xx";
+ case 'j':
+ return "day_of_year_xxx"; // three digits
+ case 'k':
+ return "hour_24"; // 1 or 2 digits
+ case 'l':
+ return "hour_12"; // 1 or 2 digits
+ case 'm':
+ return "month_xx";
+ case 'M':
+ return "minute_xx";
+ case 'p':
+ return "am_pm";
+ case 's':
+ return "unix_seconds";
+ case 'S':
+ return "seconds_xx";
+ case 't':
+ // This is different from unix date (which would insert a tab character
+ // here)
+ return "unix_millis";
+ case 'y':
+ return "year_xx";
+ case 'Y':
+ return "year_xxxx";
+ case 'z':
+ return "timezone_delta";
+ default:
+// LOG.warn("Unrecognized escape in event format string: %" + c);
+ return "" + c;
+ }
+
+ }
+
+ /**
+ * Hardcoded lookups for %x style escape replacement. Add your own!
+ *
+ * All shorthands are Date format strings, currently.
+ *
+ * Returns the empty string if an escape is not recognized.
+ *
+ * Dates follow the same format as unix date, with a few exceptions.
+ *
+ */
+ public static String replaceShorthand(char c, Map<String, String> headers) {
+ // It's a date
+ String formatString = "";
+ switch (c) {
+ case '%':
+ return "%";
+ case 'a':
+ formatString = "EEE";
+ break;
+ case 'A':
+ formatString = "EEEE";
+ break;
+ case 'b':
+ formatString = "MMM";
+ break;
+ case 'B':
+ formatString = "MMMM";
+ break;
+ case 'c':
+ formatString = "EEE MMM d HH:mm:ss yyyy";
+ break;
+ case 'd':
+ formatString = "dd";
+ break;
+ case 'D':
+ formatString = "MM/dd/yy";
+ break;
+ case 'H':
+ formatString = "HH";
+ break;
+ case 'I':
+ formatString = "hh";
+ break;
+ case 'j':
+ formatString = "DDD";
+ break;
+ case 'k':
+ formatString = "H";
+ break;
+ case 'l':
+ formatString = "h";
+ break;
+ case 'm':
+ formatString = "MM";
+ break;
+ case 'M':
+ formatString = "mm";
+ break;
+ case 'p':
+ formatString = "a";
+ break;
+ case 's':
+ return "" + (Long.valueOf(headers.get("timestamp"))/ 1000);
+ case 'S':
+ formatString = "ss";
+ break;
+ case 't':
+ // This is different from unix date (which would insert a tab character
+ // here)
+ return headers.get("timestamp");
+ case 'y':
+ formatString = "yy";
+ break;
+ case 'Y':
+ formatString = "yyyy";
+ break;
+ case 'z':
+ formatString = "ZZZ";
+ break;
+ default:
+// LOG.warn("Unrecognized escape in event format string: %" + c);
+ return "";
+ }
+ SimpleDateFormat format = new SimpleDateFormat(formatString);
+ Date date = new Date(Long.valueOf(headers.get("timestamp")));
+ return format.format(date);
+ }
+
+ /**
+ * Replace all substrings of form %{tagname} with get(tagname).toString() and
+ * all shorthand substrings of form %x with a special value.
+ *
+ * Any unrecognized / not found tags will be replaced with the empty string.
+ *
+ * TODO(henry): we may want to consider taking this out of Event and into a
+ * more general class when we get more use cases for this pattern.
+ */
+ public static String escapeString(String in, Map<String, String> headers) {
+ Matcher matcher = tagPattern.matcher(in);
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ String replacement = "";
+ // Group 2 is the %{...} pattern
+ if (matcher.group(2) != null) {
+
+ replacement = headers.get(matcher.group(2));
+ if (replacement == null) {
+ replacement = "";
+// LOG.warn("Tag " + matcher.group(2) + " not found");
+ }
+ } else {
+ // The %x pattern.
+ // Since we know the match is a single character, we can
+ // switch on that rather than the string.
+ Preconditions.checkState(matcher.group(1) != null
+ && matcher.group(1).length() == 1,
+ "Expected to match single character tag in string " + in);
+ char c = matcher.group(1).charAt(0);
+ replacement = replaceShorthand(c, headers);
+ }
+
+ // The replacement string must have '$' and '\' chars escaped. This
+ // replacement string is pretty arcane.
+ //
+ // replacee : '$' -> for java '\$' -> for regex "\\$"
+ // replacement: '\$' -> for regex '\\\$' -> for java "\\\\\\$"
+ //
+ // replacee : '\' -> for java "\\" -> for regex "\\\\"
+ // replacement: '\\' -> for regex "\\\\" -> for java "\\\\\\\\"
+
+ // note: order matters
+ replacement = replacement.replaceAll("\\\\", "\\\\\\\\");
+ replacement = replacement.replaceAll("\\$", "\\\\\\$");
+
+ matcher.appendReplacement(sb, replacement);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * Instead of replacing escape sequences in a string, this method returns a
+ * mapping of an attribute name to the value based on the escape sequence
+ * found in the argument string.
+ */
+ public static Map<String, String> getEscapeMapping(String in, Map<String, String> headers) {
+ Map<String, String> mapping = new HashMap<String, String>();
+ Matcher matcher = tagPattern.matcher(in);
+ while (matcher.find()) {
+ String replacement = "";
+ // Group 2 is the %{...} pattern
+ if (matcher.group(2) != null) {
+
+ replacement = headers.get(matcher.group(2));
+
+ if (replacement == null) {
+ replacement = "";
+// LOG.warn("Tag " + matcher.group(2) + " not found");
+ }
+ mapping.put(matcher.group(2), replacement);
+ } else {
+ // The %x pattern.
+ // Since we know the match is a single character, we can
+ // switch on that rather than the string.
+ Preconditions.checkState(matcher.group(1) != null
+ && matcher.group(1).length() == 1,
+ "Expected to match single character tag in string " + in);
+ char c = matcher.group(1).charAt(0);
+ replacement = replaceShorthand(c, headers);
+ mapping.put(expandShorthand(c), replacement);
+ }
+ }
+ return mapping;
+
+ }
+}
+
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink;
+
+import org.apache.flume.Event;
+
+public interface FlumeFormatter {
+
+ @SuppressWarnings("rawtypes")
+ Class getKeyClass();
+
+ @SuppressWarnings("rawtypes")
+ Class getValueClass();
+
+ Object getKey(Event e);
+
+ Object getValue(Event e);
+
+ byte[] getBytes(Event e);
+
+}
\ No newline at end of file
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class BucketWriter {
+
+ public static enum BucketFlushStatus {
+ BatchStarted, BatchPending, BatchFlushed
+ }
+
+ private HDFSWriter writer;
+ private FlumeFormatter formatter;
+ private long eventCounter;
+ private long processSize;
+ private long lastProcessTime;
+ private long fileExentionCounter;
+ private long batchCounter;
+ private String filePath;
+ private long rollInterval;
+ private long rollSize;
+ private long rollCount;
+ private long batchSize;
+ private CompressionCodec codeC;
+ private CompressionType compType;
+
+ // clear the class counters
+ private void resetCounters() {
+ eventCounter = 0;
+ processSize = 0;
+ lastProcessTime = 0;
+ batchCounter = 0;
+ }
+
+ // constructor. initialize the thresholds and open the file handle
+ public BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize)
+ throws IOException {
+ rollInterval = rollInt;
+ rollSize = rollSz;
+ rollCount = rollCnt;
+ batchSize = bSize;
+
+ resetCounters();
+ fileExentionCounter = 0;
+ // open();
+ }
+
+ public void open() throws IOException {
+ if ((filePath == null) || (writer == null) || (formatter == null)) {
+ throw new IOException("Invalid file settings");
+ }
+
+ String bucketPath = filePath + "." + fileExentionCounter;
+ if (codeC == null) {
+ writer.open(bucketPath, formatter);
+ } else {
+ writer.open(bucketPath, codeC, compType, formatter);
+ }
+ }
+
+ public void open(String fPath, HDFSWriter hWriter, FlumeFormatter fmt)
+ throws IOException {
+ open(fPath, null, CompressionType.NONE, hWriter, fmt);
+ }
+
+ public void open(String fPath, CompressionCodec codec, CompressionType cType,
+ HDFSWriter hWriter, FlumeFormatter fmt) throws IOException {
+ filePath = fPath;
+ codeC = codec;
+ compType = cType;
+ writer = hWriter;
+ formatter = fmt;
+ open();
+ }
+
+ // close the file handle
+ public void close() throws IOException {
+ resetCounters();
+ if (writer != null) {
+ writer.close();
+ fileExentionCounter++;
+ }
+ }
+
+ // close the file, ignore the IOException
+ public void abort() {
+ try {
+ close();
+ } catch (IOException eIO) {
+ // Ignore it
+ }
+ }
+
+ // flush the data
+ public void flush() throws IOException {
+ writer.sync();
+ batchCounter = 0;
+ }
+
+ // handle the batching, do the real flush if its time
+ public BucketFlushStatus sync() throws IOException {
+ BucketFlushStatus syncStatus;
+
+ if ((batchCounter == batchSize)) {
+ flush();
+ syncStatus = BucketFlushStatus.BatchFlushed;
+ } else {
+ if (batchCounter == 1) {
+ syncStatus = BucketFlushStatus.BatchStarted;
+ } else {
+ syncStatus = BucketFlushStatus.BatchPending;
+ }
+ }
+ return syncStatus;
+ }
+
+ // append the data, update stats, handle roll and batching
+ public BucketFlushStatus append(Event e) throws IOException {
+ BucketFlushStatus syncStatus;
+
+ writer.append(e, formatter);
+
+ // update statistics
+ processSize += e.getBody().length;
+ lastProcessTime = System.currentTimeMillis() * 1000;
+ eventCounter++;
+ batchCounter++;
+
+ // check if its time to rotate the file
+ if (shouldRotate()) {
+ close();
+ open();
+ syncStatus = BucketFlushStatus.BatchFlushed;
+ } else {
+ syncStatus = sync();
+ }
+
+ return syncStatus;
+ }
+
+ // check if time to rotate the file
+ public boolean shouldRotate() {
+ boolean doRotate = false;
+
+ if ((rollInterval > 0)
+ && (rollInterval < (System.currentTimeMillis() - lastProcessTime) / 1000))
+ doRotate = true;
+ if ((rollCount > 0) && (rollCount < eventCounter)) {
+ eventCounter = 0;
+ doRotate = true;
+ }
+ if ((rollSize > 0) && (rollSize < processSize)) {
+ processSize = 0;
+ doRotate = true;
+ }
+
+ return doRotate;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+public class HDFSCompressedDataStream implements HDFSWriter {
+
+ private CompressionOutputStream outStream;
+
+ @Override
+ public void open(String filePath, FlumeFormatter fmt) throws IOException {
+ DefaultCodec defCodec = new DefaultCodec();
+ CompressionType cType = CompressionType.BLOCK;
+ open(filePath, defCodec, cType, fmt);
+ }
+
+ @Override
+ public void open(String filePath, CompressionCodec codec,
+ CompressionType cType, FlumeFormatter fmt) throws IOException {
+
+ FSDataOutputStream fsOutStream;
+ Configuration conf = new Configuration();
+ Path dstPath = new Path(filePath);
+ FileSystem hdfs = dstPath.getFileSystem(conf);
+
+ if (conf.getBoolean("hdfs.append.support", false) == true) {
+ fsOutStream = hdfs.append(dstPath);
+ } else {
+ fsOutStream = hdfs.create(dstPath);
+ }
+ outStream = codec.createOutputStream(fsOutStream);
+ }
+
+ @Override
+ public void append(Event e, FlumeFormatter fmt) throws IOException {
+ byte[] bValue = fmt.getBytes(e);
+ outStream.write(bValue, 0, bValue.length);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ outStream.finish();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outStream.close();
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSDataStream implements HDFSWriter {
+ private FSDataOutputStream outStream;
+
+ @Override
+ public void open(String filePath, FlumeFormatter fmt) throws IOException {
+ Configuration conf = new Configuration();
+ Path dstPath = new Path(filePath);
+ FileSystem hdfs = dstPath.getFileSystem(conf);
+
+ if (conf.getBoolean("hdfs.append.support", false) == true) {
+ outStream = hdfs.append(dstPath);
+ } else {
+ outStream = hdfs.create(dstPath);
+ }
+ }
+
+ @Override
+ public void open(String filePath, CompressionCodec codec,
+ CompressionType cType, FlumeFormatter fmt) throws IOException {
+ open(filePath, fmt);
+ }
+
+ @Override
+ public void append(Event e, FlumeFormatter fmt) throws IOException {
+ byte[] bValue = fmt.getBytes(e);
+ outStream.write(bValue, 0, bValue.length);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ outStream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outStream.close();
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.flume.sink.hdfs.BucketWriter.BucketFlushStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HDFSEventSink extends AbstractSink implements PollableSink,
+ Configurable {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HDFSEventSink.class);
+
+ static final long defaultRollInterval = 30;
+ static final long defaultRollSize = 1024;
+ static final long defaultRollCount = 10;
+ static final String defaultFileName = "FlumeData";
+ static final String defaultBucketFormat = "%yyyy-%mm-%dd/%HH";
+ static final long defaultBatchSize = 1;
+ static final long defaultTxnEventMax = 100;
+ static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
+ static final int defaultMaxOpenFiles = 5000;
+ static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
+
+ private long rollInterval;
+ private long rollSize;
+ private long rollCount;
+ private long txnEventMax;
+ private long batchSize;
+ private CompressionCodec codeC;
+ private CompressionType compType;
+ private String fileType;
+ private String path;
+ private int maxOpenFiles;
+ String writeFormat;
+
+ /*
+ * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
+ * the oldest file handle if there are too many open ones
+ */
+ private class writerLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
+ private static final long serialVersionUID = 1L;
+
+ protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
+ if (super.size() > maxOpenFiles) {
+ // If we have more that max open files, then close the last one and
+ // return true
+ try {
+ eldest.getValue().close();
+ } catch (IOException eI) {
+ LOG.warn(eldest.getKey().toString(), eI);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ final writerLinkedHashMap sfWriters = new writerLinkedHashMap();
+
+ // Used to short-circuit around doing regex matches when we know there are
+ // no templates to be replaced.
+ // private boolean shouldSub = false;
+
+ public HDFSEventSink() {
+
+ }
+
+ // read configuration and setup thresholds
+ @Override
+ public void configure(Context context) {
+ String dirpath = context.get("hdfs.path", String.class);
+ String fileName = context.get("hdfs.filePrefix", String.class);
+ String rollInterval = context.get("hdfs.rollInterval", String.class);
+ String rollSize = context.get("hdfs.rollSize", String.class);
+ String rollCount = context.get("hdfs.rollCount", String.class);
+ String batchSize = context.get("hdfs.batchSize", String.class);
+ String txnEventMax = context.get("hdfs.txnEventMax", String.class);
+ String codecName = context.get("hdfs.codeC", String.class);
+ String fileType = context.get("hdfs.fileType", String.class);
+ String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
+ String writeFormat = context.get("hdfs.writeFormat", String.class);
+
+ if (fileName == null)
+ fileName = defaultFileName;
+ this.path = new String(dirpath + "/" + fileName);
+
+ if (rollInterval == null) {
+ this.rollInterval = defaultRollInterval;
+ } else {
+ this.rollInterval = Long.parseLong(rollInterval);
+ }
+
+ if (rollSize == null) {
+ this.rollSize = defaultRollSize;
+ } else {
+ this.rollSize = Long.parseLong(rollSize);
+ }
+
+ if (rollCount == null) {
+ this.rollCount = defaultRollCount;
+ } else {
+ this.rollCount = Long.parseLong(rollCount);
+ }
+
+ if ((batchSize == null) || batchSize.equals("0")) {
+ this.batchSize = defaultBatchSize;
+ } else {
+ this.batchSize = Long.parseLong(batchSize);
+ }
+
+ if ((txnEventMax == null) || txnEventMax.equals("0")) {
+ this.txnEventMax = defaultTxnEventMax;
+ } else {
+ this.txnEventMax = Long.parseLong(txnEventMax);
+ }
+
+ if (codecName == null) {
+ codeC = null;
+ compType = CompressionType.NONE;
+ } else {
+ codeC = getCodec(codecName);
+ // TODO : set proper compression type
+ compType = CompressionType.BLOCK;
+ }
+
+ if (fileType == null) {
+ this.fileType = defaultFileType;
+ } else {
+ this.fileType = fileType;
+ }
+
+ if (maxOpenFiles == null) {
+ this.maxOpenFiles = defaultMaxOpenFiles;
+ } else {
+ this.maxOpenFiles = Integer.parseInt(maxOpenFiles);
+ }
+
+ if (writeFormat == null) {
+ this.writeFormat = defaultWriteFormat;
+ } else {
+ this.writeFormat = writeFormat;
+ }
+ }
+
+ private static boolean codecMatches(Class<? extends CompressionCodec> cls,
+ String codecName) {
+ String simpleName = cls.getSimpleName();
+ if (cls.getName().equals(codecName)
+ || simpleName.equalsIgnoreCase(codecName)) {
+ return true;
+ }
+ if (simpleName.endsWith("Codec")) {
+ String prefix = simpleName.substring(0,
+ simpleName.length() - "Codec".length());
+ if (prefix.equalsIgnoreCase(codecName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static CompressionCodec getCodec(String codecName) {
+ Configuration conf = new Configuration();
+ List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
+ .getCodecClasses(conf);
+ // Wish we could base this on DefaultCodec but appears not all codec's
+ // extend DefaultCodec(Lzo)
+ CompressionCodec codec = null;
+ ArrayList<String> codecStrs = new ArrayList<String>();
+ codecStrs.add("None");
+ for (Class<? extends CompressionCodec> cls : codecs) {
+ codecStrs.add(cls.getSimpleName());
+
+ if (codecMatches(cls, codecName)) {
+ try {
+ codec = cls.newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Unable to instantiate " + cls + " class");
+ } catch (IllegalAccessException e) {
+ LOG.error("Unable to access " + cls + " class");
+ }
+ }
+ }
+
+ if (codec == null) {
+ if (!codecName.equalsIgnoreCase("None")) {
+ throw new IllegalArgumentException("Unsupported compression codec "
+ + codecName + ". Please choose from: " + codecStrs);
+ }
+ } else if (codec instanceof org.apache.hadoop.conf.Configurable) {
+ // Must check instanceof codec as BZip2Codec doesn't inherit Configurable
+ // Must set the configuration for Configurable objects that may or do use
+ // native libs
+ ((org.apache.hadoop.conf.Configurable) codec).setConf(conf);
+ }
+ return codec;
+ }
+
+ /**
+ * Pull events out of channel and send it to HDFS - take at the most
+ * txnEventMax, that's the maximum #events to hold in channel for a given
+ * transaction - find the corresponding bucket for the event, ensure the file
+ * is open - extract the pay-load and append to HDFS file
+ */
+ @Override
+ public Status process() throws EventDeliveryException {
+ Channel channel = getChannel();
+ Transaction transaction = channel.getTransaction();
+ Map<String, BucketWriter> batchMap = new HashMap<String, BucketWriter>();
+ BucketFlushStatus syncedUp;
+
+ try {
+ transaction.begin();
+ for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
+ Event event = channel.take();
+ if (event == null)
+ break;
+
+ // reconstruct the path name by substituting place holders
+ String realPath = BucketPath.escapeString(path, event.getHeaders());
+ BucketWriter bw = sfWriters.get(realPath);
+
+ // we haven't seen this file yet, so open it and cache the handle
+ if (bw == null) {
+ HDFSWriter writer = HDFSWriterFactory.getWriter(fileType);
+ FlumeFormatter formatter = HDFSFormatterFactory
+ .getFormatter(writeFormat);
+ bw = new BucketWriter(rollInterval, rollSize, rollCount, batchSize);
+ bw.open(realPath, codeC, compType, writer, formatter);
+ sfWriters.put(realPath, bw);
+ }
+
+ // Write the data to HDFS
+ syncedUp = bw.append(event);
+
+ // keep track of the files in current batch that are not flushed
+ // we need to flush all those at the end of the transaction
+ if (syncedUp == BucketFlushStatus.BatchStarted)
+ batchMap.put(bw.getFilePath(), bw);
+ else if ((batchSize > 1)
+ && (syncedUp == BucketFlushStatus.BatchFlushed))
+ batchMap.remove(bw.getFilePath());
+ }
+
+ // flush any pending writes in the given transaction
+ for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
+ e.getValue().flush();
+ }
+ batchMap.clear();
+ transaction.commit();
+ return Status.READY;
+ } catch (IOException eIO) {
+ transaction.rollback();
+ LOG.error("HDFS IO error", eIO);
+ return Status.BACKOFF;
+ } catch (Exception e) {
+ transaction.rollback();
+ LOG.error("process failed", e);
+ throw new EventDeliveryException(e.getMessage());
+ } finally {
+ // clear any leftover writes in the given transaction
+ for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
+ e.getValue().abort();
+ }
+ transaction.close();
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
+ LOG.info("Closing " + e.getKey());
+ e.getValue().close();
+ }
+ } catch (IOException eIO) {
+ LOG.warn("IOException in opening file", eIO);
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
+ try {
+ e.getValue().open();
+ } catch (IOException eIO) {
+ LOG.warn("IOException in opening file", eIO);
+ }
+ }
+ super.start();
+ }
+
+}
\ No newline at end of file
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.sink.FlumeFormatter;
+
+public class HDFSFormatterFactory {
+
+ HDFSFormatterFactory() {
+
+ }
+
+ static final String hdfsWritableFormat = "Writable";
+ static final String hdfsAvroFormat = "Avro";
+
+ static FlumeFormatter getFormatter(String formatType) throws IOException {
+ if (formatType == hdfsWritableFormat)
+ return new HDFSWritableFormatter();
+ else
+ throw new IOException("Incorrect formatter type");
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSSequenceFile implements HDFSWriter {
+
+ private SequenceFile.Writer writer;
+
+ public HDFSSequenceFile() {
+ writer = null;
+ }
+
+ @Override
+ public void open(String filePath, FlumeFormatter fmt) throws IOException {
+ open(filePath, null, CompressionType.NONE, fmt);
+ }
+
+ @Override
+ public void open(String filePath, CompressionCodec codeC,
+ CompressionType compType, FlumeFormatter fmt) throws IOException {
+ Configuration conf = new Configuration();
+ Path dstPath = new Path(filePath);
+ FileSystem hdfs = dstPath.getFileSystem(conf);
+
+ if (conf.getBoolean("hdfs.append.support", false) == true) {
+ FSDataOutputStream outStream = hdfs.append(dstPath);
+ writer = SequenceFile.createWriter(conf, outStream, fmt.getKeyClass(),
+ fmt.getValueClass(), compType, codeC);
+ } else {
+ writer = SequenceFile.createWriter(hdfs, conf, dstPath,
+ fmt.getKeyClass(), fmt.getValueClass(), compType, codeC);
+ }
+ }
+
+ @Override
+ public void append(Event e, FlumeFormatter formatter) throws IOException {
+ writer.append(formatter.getKey(e), formatter.getValue(e));
+ }
+
+ @Override
+ public void sync() throws IOException {
+ writer.syncFs();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ writer = null;
+ }
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+public class HDFSWritableFormatter implements FlumeFormatter {
+
+ private BytesWritable makeByteWritable(Event e) {
+ BytesWritable bytesObject = new BytesWritable();
+ bytesObject.set(e.getBody(), 0, e.getBody().length);
+ return bytesObject;
+ }
+
+ @Override
+ public Class<LongWritable> getKeyClass() {
+ return LongWritable.class;
+ }
+
+ @Override
+ public Class<BytesWritable> getValueClass() {
+ return BytesWritable.class;
+ }
+
+ @Override
+ public Object getKey(Event e) {
+ // Write the data to HDFS
+ String timestamp = e.getHeaders().get("timestamp");
+ long eventStamp;
+
+ if (timestamp == null) {
+ eventStamp = System.currentTimeMillis();
+ } else {
+ eventStamp = Long.valueOf(timestamp);
+ }
+ LongWritable longObject = new LongWritable(eventStamp);
+ return longObject;
+ }
+
+ @Override
+ public Object getValue(Event e) {
+ return makeByteWritable(e);
+ }
+
+ @Override
+ public byte[] getBytes(Event e) {
+ return makeByteWritable(e).getBytes();
+ }
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public interface HDFSWriter {
+
+ public void open(String filePath, FlumeFormatter fmt) throws IOException;
+
+ // public void open(String filePath, CompressionCodec codec, CompressionType
+ // cType) throws IOException;
+
+ public void open(String filePath, CompressionCodec codec,
+ CompressionType cType, FlumeFormatter fmt) throws IOException;
+
+ // public void append(long key, byte [] val) throws IOException;
+
+ public void append(Event e, FlumeFormatter fmt) throws IOException;
+
+ public void sync() throws IOException;
+
+ public void close() throws IOException;
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+abstract class HDFSWriterFactory {
+ static final String SequenceFileType = "SequenceFile";
+ static final String DataStreamType = "DataStream";
+ static final String CompStreamType = "CompressedStream";
+
+ public HDFSWriterFactory() {
+
+ }
+
+ public static HDFSWriter getWriter(String fileType) throws IOException {
+ if (fileType == SequenceFileType) {
+ return new HDFSSequenceFile();
+ } else if (fileType == DataStreamType) {
+ return new HDFSDataStream();
+ } else if (fileType == CompStreamType) {
+ return new HDFSCompressedDataStream();
+ } else {
+ throw new IOException("File type " + fileType + " not supported");
+ }
+ }
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1178987&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Tue Oct 4 22:26:50 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.Calendar;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.sink.hdfs.HDFSEventSink;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHDFSEventSink {
+
+ private HDFSEventSink sink;
+ private String testPath;
+
+ @Before
+ public void setUp() {
+ /*
+ * FIXME: Use a dynamic path to support concurrent test execution. Also,
+ * beware of the case where this path is used for something or when the
+ * Hadoop config points at file:/// rather than hdfs://. We need to find a
+ * better way of testing HDFS related functionality.
+ */
+ testPath = "/user/flume/testdata";
+ sink = new HDFSEventSink();
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ @Test
+ public void testLifecycle() throws InterruptedException, LifecycleException {
+ Context context = new Context();
+
+ context.put("hdfs.path", testPath);
+ /*
+ * context.put("hdfs.rollInterval", String.class);
+ * context.get("hdfs.rollSize", String.class); context.get("hdfs.rollCount",
+ * String.class);
+ */
+ Configurables.configure(sink, context);
+
+ sink.start();
+ sink.stop();
+ }
+
+ @Test
+ public void testAppend() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+
+ final long txnMax = 25;
+ final long rollCount = 3;
+ final long batchSize = 2;
+ final String fileName = "FlumeData";
+
+ // clear the test directory
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+ Path dirPath = new Path(testPath);
+ fs.delete(dirPath, true);
+ fs.mkdirs(dirPath);
+
+ Context context = new Context();
+
+ context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+ context.put("hdfs.filePrefix", fileName);
+ context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+ context.put("hdfs.rollCount", String.valueOf(rollCount));
+ context.put("hdfs.batchSize", String.valueOf(batchSize));
+
+ Configurables.configure(sink, context);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, context);
+
+ sink.setChannel(channel);
+ sink.start();
+
+ Transaction txn = channel.getTransaction();
+
+ Calendar eventDate = Calendar.getInstance();
+
+ // push the event batches into channel
+ for (int i = 1; i < 4; i++) {
+ txn.begin();
+ for (int j = 1; j <= txnMax; j++) {
+ Event event = new SimpleEvent();
+ eventDate.clear();
+ eventDate.set(2011, i, i, i, 0); // yy mm dd
+ event.getHeaders().put("timestamp",
+ String.valueOf(eventDate.getTimeInMillis()));
+ event.getHeaders().put("hostname", "Host" + i);
+
+ event.setBody(("Test." + i + "." + j).getBytes());
+ channel.put(event);
+ }
+ txn.commit();
+
+ // execute sink to process the events
+ sink.process();
+ }
+
+ sink.stop();
+
+ /*
+ *
+ * // loop through all the files generated and check their contains
+ * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
+ * FileUtil.stat2Paths(dirStat);
+ *
+ * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
+ * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
+ * = new LongWritable(); BytesWritable value = new BytesWritable();
+ *
+ * while (reader.next(key, value)) { logger.info(key+ ":" +
+ * value.toString()); } reader.close(); } } catch (IOException ioe) {
+ * System.err.println("IOException during operation: " + ioe.toString());
+ * System.exit(1); }
+ */
+ }
+}