You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/10/22 16:04:42 UTC

svn commit: r1187718 - in /camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs: HdfsConsumer.java HdfsFileType.java

Author: davsclaus
Date: Sat Oct 22 14:04:42 2011
New Revision: 1187718

URL: http://svn.apache.org/viewvc?rev=1187718&view=rev
Log:
CAMEL-4555: camel-hadoop supports merging multiple segment files in the consumer. Thanks to Ben Hoyt for the patch.

Modified:
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java?rev=1187718&r1=1187717&r2=1187718&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java Sat Oct 22 14:04:42 2011
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.component.hdfs;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import javax.xml.ws.Holder;
 
 import org.apache.camel.Exchange;
@@ -27,6 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -73,10 +76,16 @@ public final class HdfsConsumer extends 
             Path pattern = info.getPath().suffix("/" + this.config.getPattern());
             fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
         }
+
         if (fileStatuses.length > 0) {
             this.idle.set(false);
         }
+
         for (int i = 0; i < fileStatuses.length; ++i) {
+            FileStatus status = fileStatuses[i];
+            if (normalFileIsDirectoryNoSuccessFile(status, info)) {
+                continue;
+            }
             try {
                 this.rwlock.writeLock().lock();
                 this.istream = HdfsInputStream.createInputStream(fileStatuses[i].getPath().toString(), this.config);
@@ -89,6 +98,8 @@ public final class HdfsConsumer extends 
             while (this.istream.next(key, value) != 0) {
                 Exchange exchange = this.getEndpoint().createExchange();
                 Message message = new DefaultMessage();
+                message.setHeader(Exchange.FILE_NAME, StringUtils
+                        .substringAfterLast(status.getPath().toString(), "/"));
                 if (key.value != null) {
                     message.setHeader(HdfsHeader.KEY.name(), key.value);
                 }
@@ -103,6 +114,16 @@ public final class HdfsConsumer extends 
         return numMessages;
     }
 
+    private boolean normalFileIsDirectoryNoSuccessFile(FileStatus status, HdfsInfo info) throws IOException {
+        if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && status.isDir()) {
+            Path successPath = new Path(status.getPath().toString() + "/_SUCCESS");
+            if (!info.getFileSystem().exists(successPath)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public HdfsInputStream getIstream() {
         try {
             rwlock.readLock().lock();

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java?rev=1187718&r1=1187717&r2=1187718&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java Sat Oct 22 14:04:42 2011
@@ -18,6 +18,8 @@ package org.apache.camel.component.hdfs;
 
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -25,6 +27,7 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+
 import javax.xml.ws.Holder;
 
 import org.apache.camel.RuntimeCamelException;
@@ -32,6 +35,9 @@ import org.apache.camel.TypeConverter;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ArrayFile;
 import org.apache.hadoop.io.BloomMapFile;
 import org.apache.hadoop.io.BooleanWritable;
@@ -63,13 +69,7 @@ public enum HdfsFileType {
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
             } finally {
-                if (is != null) {
-                    try {
-                        is.close();
-                    } catch (IOException e) {
-                        throw new RuntimeException("Error closing stream", e);
-                    }
-                }
+                IOHelper.close(is);
             }
         }
 
@@ -123,13 +123,46 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
-                rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+                if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
+                    HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                    rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+                } else {
+                    rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration));
+                }
                 return rin;
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
             }
         }
+
+        private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
+
+                File outputDest = File.createTempFile(fname, ".hdfs");
+                if (outputDest.exists()) {
+                    outputDest.delete();
+                }
+
+                HdfsInfo hdfsInfo = new HdfsInfo(hdfsPath);
+                FileSystem fileSystem = hdfsInfo.getFileSystem();
+                FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf());
+                try {
+                    FileUtil.copyMerge(
+                            fileSystem, // src
+                            new Path(hdfsPath),
+                            FileSystem.getLocal(new Configuration()), // dest
+                            new Path(outputDest.toURI()),
+                            false, fileSystem.getConf(), null);
+                } catch (IOException e) {
+                    return outputDest;
+                }
+
+                return new File(outputDest, fname);
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
     },
 
     SEQUENCE_FILE {