You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/10/22 16:04:42 UTC
svn commit: r1187718 - in
/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs:
HdfsConsumer.java HdfsFileType.java
Author: davsclaus
Date: Sat Oct 22 14:04:42 2011
New Revision: 1187718
URL: http://svn.apache.org/viewvc?rev=1187718&view=rev
Log:
CAMEL-4555: camel-hadoop supports merging multiple segment files in the consumer. Thanks to Ben Hoyt for the patch.
Modified:
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java?rev=1187718&r1=1187717&r2=1187718&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java Sat Oct 22 14:04:42 2011
@@ -16,9 +16,11 @@
*/
package org.apache.camel.component.hdfs;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.xml.ws.Holder;
import org.apache.camel.Exchange;
@@ -27,6 +29,7 @@ import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -73,10 +76,16 @@ public final class HdfsConsumer extends
Path pattern = info.getPath().suffix("/" + this.config.getPattern());
fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
}
+
if (fileStatuses.length > 0) {
this.idle.set(false);
}
+
for (int i = 0; i < fileStatuses.length; ++i) {
+ FileStatus status = fileStatuses[i];
+ if (normalFileIsDirectoryNoSuccessFile(status, info)) {
+ continue;
+ }
try {
this.rwlock.writeLock().lock();
this.istream = HdfsInputStream.createInputStream(fileStatuses[i].getPath().toString(), this.config);
@@ -89,6 +98,8 @@ public final class HdfsConsumer extends
while (this.istream.next(key, value) != 0) {
Exchange exchange = this.getEndpoint().createExchange();
Message message = new DefaultMessage();
+ message.setHeader(Exchange.FILE_NAME, StringUtils
+ .substringAfterLast(status.getPath().toString(), "/"));
if (key.value != null) {
message.setHeader(HdfsHeader.KEY.name(), key.value);
}
@@ -103,6 +114,16 @@ public final class HdfsConsumer extends
return numMessages;
}
+ private boolean normalFileIsDirectoryNoSuccessFile(FileStatus status, HdfsInfo info) throws IOException {
+ if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && status.isDir()) {
+ Path successPath = new Path(status.getPath().toString() + "/_SUCCESS");
+ if (!info.getFileSystem().exists(successPath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public HdfsInputStream getIstream() {
try {
rwlock.readLock().lock();
Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java?rev=1187718&r1=1187717&r2=1187718&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java Sat Oct 22 14:04:42 2011
@@ -18,6 +18,8 @@ package org.apache.camel.component.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -25,6 +27,7 @@ import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+
import javax.xml.ws.Holder;
import org.apache.camel.RuntimeCamelException;
@@ -32,6 +35,9 @@ import org.apache.camel.TypeConverter;
import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayFile;
import org.apache.hadoop.io.BloomMapFile;
import org.apache.hadoop.io.BooleanWritable;
@@ -63,13 +69,7 @@ public enum HdfsFileType {
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
} finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- throw new RuntimeException("Error closing stream", e);
- }
- }
+ IOHelper.close(is);
}
}
@@ -123,13 +123,46 @@ public enum HdfsFileType {
public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
try {
Closeable rin;
- HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
- rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+ if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
+ HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+ } else {
+ rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration));
+ }
return rin;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
}
}
+
+ private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
+ try {
+ String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
+
+ File outputDest = File.createTempFile(fname, ".hdfs");
+ if (outputDest.exists()) {
+ outputDest.delete();
+ }
+
+ HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+ FileSystem fileSystem = hdfsInfo.getFileSystem();
+ FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf());
+ try {
+ FileUtil.copyMerge(
+ fileSystem, // src
+ new Path(hdfsPath),
+ FileSystem.getLocal(new Configuration()), // dest
+ new Path(outputDest.toURI()),
+ false, fileSystem.getConf(), null);
+ } catch (IOException e) {
+ return outputDest;
+ }
+
+ return new File(outputDest, fname);
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
},
SEQUENCE_FILE {