You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/12 14:18:41 UTC

[08/10] git commit: [CAMEL-7249] Working version of camel-hdfs2 component

[CAMEL-7249] Working version of camel-hdfs2 component

* All unit tests works fine.
* Usage of all deprecated APIs changed.
* hdfs://localhost tests work (@Ignored - they require external hdfs
system)

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af7661ab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af7661ab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af7661ab

Branch: refs/heads/master
Commit: af7661abb98b85658c35974bb914aae6602365a4
Parents: 7ad65f9
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Mon Mar 10 15:05:43 2014 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Wed Mar 12 09:56:42 2014 +0100

----------------------------------------------------------------------
 components/camel-hdfs2/pom.xml                  |  108 ++
 .../camel/component/hdfs2/HdfsComponent.java    |   82 +
 .../component/hdfs2/HdfsCompressionCodec.java   |   49 +
 .../component/hdfs2/HdfsConfiguration.java      |  410 +++++
 .../camel/component/hdfs2/HdfsConstants.java    |   51 +
 .../camel/component/hdfs2/HdfsConsumer.java     |  179 ++
 .../camel/component/hdfs2/HdfsEndpoint.java     |   62 +
 .../component/hdfs2/HdfsFileSystemType.java     |   51 +
 .../camel/component/hdfs2/HdfsFileType.java     |  522 ++++++
 .../camel/component/hdfs2/HdfsHeader.java       |   23 +
 .../apache/camel/component/hdfs2/HdfsInfo.java  |   51 +
 .../camel/component/hdfs2/HdfsInfoFactory.java  |   37 +
 .../camel/component/hdfs2/HdfsInputStream.java  |   92 +
 .../camel/component/hdfs2/HdfsOutputStream.java |  118 ++
 .../camel/component/hdfs2/HdfsProducer.java     |  298 ++++
 .../component/hdfs2/HdfsWritableFactories.java  |  310 ++++
 .../apache/camel/component/hdfs2/Holder.java    |   40 +
 .../src/main/resources/META-INF/LICENSE.txt     |  203 +++
 .../src/main/resources/META-INF/NOTICE.txt      |   11 +
 .../services/org/apache/camel/component/hdfs2   |   18 +
 .../component/hdfs2/FromFileToHdfsTest.java     |  111 ++
 .../camel/component/hdfs2/HdfsConsumerTest.java |  391 +++++
 .../hdfs2/HdfsProducerConsumerTest.java         |   93 +
 .../component/hdfs2/HdfsProducerSplitTest.java  |  134 ++
 .../camel/component/hdfs2/HdfsProducerTest.java |  424 +++++
 .../camel/component/hdfs2/HdfsTestSupport.java  |   40 +
 .../hdfs2/integration/HdfsAppendTest.java       |   94 +
 .../HdfsProducerConsumerIntegrationTest.java    |   88 +
 .../src/test/resources/hdfs-default.xml         | 1607 ++++++++++++++++++
 .../src/test/resources/hdfs-test.xml            | 1607 ++++++++++++++++++
 .../src/test/resources/log4j.properties         |   39 +
 parent/pom.xml                                  |   12 +
 32 files changed, 7355 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/pom.xml b/components/camel-hdfs2/pom.xml
new file mode 100644
index 0000000..61f63b1
--- /dev/null
+++ b/components/camel-hdfs2/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>2.13-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-hdfs2</artifactId>
+    <packaging>bundle</packaging>
+    <name>Camel :: HDFS2</name>
+    <description>Camel HDFS support with Hadoop 2.x libraries</description>
+
+    <properties>
+        <camel.osgi.export.pkg>org.apache.camel.component.hdfs2.*</camel.osgi.export.pkg>
+        <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=hdfs2</camel.osgi.export.service>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>${commons-codec-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <version>${commons-configuration-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>${commons-lang-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>${commons-net-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>${jackson-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>${jackson-version}</version>
+        </dependency>
+
+        <!-- testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
new file mode 100644
index 0000000..52a8888
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.net.URL;
+import java.util.Map;
+import javax.security.auth.login.Configuration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HdfsComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsComponent.class);
+
+    public HdfsComponent() {
+        initHdfs();
+    }
+
+    public HdfsComponent(CamelContext context) {
+        super(context);
+        initHdfs();
+    }
+
+    protected final Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        HdfsEndpoint hdfsEndpoint = new HdfsEndpoint(uri, this.getCamelContext());
+        setProperties(hdfsEndpoint.getConfig(), parameters);
+        return hdfsEndpoint;
+    }
+
+    protected void initHdfs() {
+        try {
+            URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
+        } catch (Throwable e) {
+            // ignore as its most likely already set
+            LOG.debug("Cannot set URLStreamHandlerFactory due " + e.getMessage() + ". This exception will be ignored.", e);
+        }
+    }
+
+    static Configuration getJAASConfiguration() {
+        Configuration auth = null;
+        try {
+            auth = Configuration.getConfiguration();
+            LOG.trace("Existing JAAS Configuration {}", auth);
+        } catch (SecurityException e) {
+            LOG.trace("Cannot load existing JAAS configuration", e);
+        }
+        return auth;
+    }
+
+    static void setJAASConfiguration(Configuration auth) {
+        if (auth != null) {
+            LOG.trace("Restoring existing JAAS Configuration {}", auth);
+            try {
+                Configuration.setConfiguration(auth);
+            } catch (SecurityException e) {
+                LOG.trace("Cannot restore JAAS Configuration. This exception is ignored.", e);
+            }
+        } else {
+            LOG.trace("No JAAS Configuration to restore");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsCompressionCodec.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsCompressionCodec.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsCompressionCodec.java
new file mode 100644
index 0000000..5a2926c
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsCompressionCodec.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+
+public enum HdfsCompressionCodec {
+
+    DEFAULT {
+        @Override
+        public CompressionCodec getCodec() {
+            return new DefaultCodec();
+        }
+    },
+
+    GZIP {
+        @Override
+        public CompressionCodec getCodec() {
+            return new GzipCodec();
+        }
+    },
+
+    BZIP2 {
+        @Override
+        public CompressionCodec getCodec() {
+            return new BZip2Codec();
+        }
+    };
+
+    public abstract CompressionCodec getCodec();
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
new file mode 100644
index 0000000..e1b6093
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.util.URISupport;
+import org.apache.hadoop.io.SequenceFile;
+
+public class HdfsConfiguration {
+
+    private URI uri;
+    private String hostName;
+    private int port = HdfsConstants.DEFAULT_PORT;
+    private String path;
+    private boolean overwrite = true;
+    private boolean append;
+    private boolean wantAppend;
+    private int bufferSize = HdfsConstants.DEFAULT_BUFFERSIZE;
+    private short replication = HdfsConstants.DEFAULT_REPLICATION;
+    private long blockSize = HdfsConstants.DEFAULT_BLOCKSIZE;
+    private SequenceFile.CompressionType compressionType = HdfsConstants.DEFAULT_COMPRESSIONTYPE;
+    private HdfsCompressionCodec compressionCodec = HdfsConstants.DEFAULT_CODEC;
+    private HdfsFileType fileType = HdfsFileType.NORMAL_FILE;
+    private HdfsFileSystemType fileSystemType = HdfsFileSystemType.HDFS;
+    private HdfsWritableFactories.WritableType keyType = HdfsWritableFactories.WritableType.NULL;
+    private HdfsWritableFactories.WritableType valueType = HdfsWritableFactories.WritableType.BYTES;
+    private String openedSuffix = HdfsConstants.DEFAULT_OPENED_SUFFIX;
+    private String readSuffix = HdfsConstants.DEFAULT_READ_SUFFIX;
+    private long initialDelay;
+    private long delay = HdfsConstants.DEFAULT_DELAY;
+    private String pattern = HdfsConstants.DEFAULT_PATTERN;
+    private int chunkSize = HdfsConstants.DEFAULT_BUFFERSIZE;
+    private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
+    private List<HdfsProducer.SplitStrategy> splitStrategies;
+    private boolean connectOnStartup = true;
+
+    public HdfsConfiguration() {
+    }
+
+    private Boolean getBoolean(Map<String, Object> hdfsSettings, String param, Boolean dflt) {
+        if (hdfsSettings.containsKey(param)) {
+            return Boolean.valueOf((String) hdfsSettings.get(param));
+        } else {
+            return dflt;
+        }
+    }
+
+    private Integer getInteger(Map<String, Object> hdfsSettings, String param, Integer dflt) {
+        if (hdfsSettings.containsKey(param)) {
+            return Integer.valueOf((String) hdfsSettings.get(param));
+        } else {
+            return dflt;
+        }
+    }
+
+    private Short getShort(Map<String, Object> hdfsSettings, String param, Short dflt) {
+        if (hdfsSettings.containsKey(param)) {
+            return Short.valueOf((String) hdfsSettings.get(param));
+        } else {
+            return dflt;
+        }
+    }
+
+    private Long getLong(Map<String, Object> hdfsSettings, String param, Long dflt) {
+        if (hdfsSettings.containsKey(param)) {
+            return Long.valueOf((String) hdfsSettings.get(param));
+        } else {
+            return dflt;
+        }
+    }
+
+    private HdfsFileType getFileType(Map<String, Object> hdfsSettings, String param, HdfsFileType dflt) {
+        String eit = (String) hdfsSettings.get(param);
+        if (eit != null) {
+            return HdfsFileType.valueOf(eit);
+        } else {
+            return dflt;
+        }
+    }
+
+    private HdfsFileSystemType getFileSystemType(Map<String, Object> hdfsSettings, String param, HdfsFileSystemType dflt) {
+        String eit = (String) hdfsSettings.get(param);
+        if (eit != null) {
+            return HdfsFileSystemType.valueOf(eit);
+        } else {
+            return dflt;
+        }
+    }
+
+    private HdfsWritableFactories.WritableType getWritableType(Map<String, Object> hdfsSettings, String param, HdfsWritableFactories.WritableType dflt) {
+        String eit = (String) hdfsSettings.get(param);
+        if (eit != null) {
+            return HdfsWritableFactories.WritableType.valueOf(eit);
+        } else {
+            return dflt;
+        }
+    }
+
+    private SequenceFile.CompressionType getCompressionType(Map<String, Object> hdfsSettings, String param, SequenceFile.CompressionType ct) {
+        String eit = (String) hdfsSettings.get(param);
+        if (eit != null) {
+            return SequenceFile.CompressionType.valueOf(eit);
+        } else {
+            return ct;
+        }
+    }
+
+    private HdfsCompressionCodec getCompressionCodec(Map<String, Object> hdfsSettings, String param, HdfsCompressionCodec cd) {
+        String eit = (String) hdfsSettings.get(param);
+        if (eit != null) {
+            return HdfsCompressionCodec.valueOf(eit);
+        } else {
+            return cd;
+        }
+    }
+
+    private String getString(Map<String, Object> hdfsSettings, String param, String dflt) {
+        if (hdfsSettings.containsKey(param)) {
+            return (String) hdfsSettings.get(param);
+        } else {
+            return dflt;
+        }
+    }
+
+    private List<HdfsProducer.SplitStrategy> getSplitStrategies(Map<String, Object> hdfsSettings) {
+        List<HdfsProducer.SplitStrategy> strategies = new ArrayList<HdfsProducer.SplitStrategy>();
+        for (Object obj : hdfsSettings.keySet()) {
+            String key = (String) obj;
+            if ("splitStrategy".equals(key)) {
+                String eit = (String) hdfsSettings.get(key);
+                if (eit != null) {
+                    String[] strstrategies = eit.split(",");
+                    for (String strstrategy : strstrategies) {
+                        String tokens[] = strstrategy.split(":");
+                        if (tokens.length != 2) {
+                            throw new IllegalArgumentException("Wrong Split Strategy " + key + "=" + eit);
+                        }
+                        HdfsProducer.SplitStrategyType sst = HdfsProducer.SplitStrategyType.valueOf(tokens[0]);
+                        long ssv = Long.valueOf(tokens[1]);
+                        strategies.add(new HdfsProducer.SplitStrategy(sst, ssv));
+                    }
+                }
+            }
+        }
+        return strategies;
+    }
+
+    public void checkConsumerOptions() {
+    }
+
+    public void checkProducerOptions() {
+        if (isAppend()) {
+            if (getSplitStrategies().size() != 0) {
+                throw new IllegalArgumentException("Split Strategies incompatible with append=true");
+            }
+            if (getFileType() != HdfsFileType.NORMAL_FILE) {
+                throw new IllegalArgumentException("append=true works only with NORMAL_FILEs");
+            }
+        }
+    }
+
+    public void parseURI(URI uri) throws URISyntaxException {
+        String protocol = uri.getScheme();
+        if (!protocol.equalsIgnoreCase("hdfs2")) {
+            throw new IllegalArgumentException("Unrecognized Cache protocol: " + protocol + " for uri: " + uri);
+        }
+        hostName = uri.getHost();
+        if (hostName == null) {
+            hostName = "localhost";
+        }
+        port = uri.getPort() == -1 ? HdfsConstants.DEFAULT_PORT : uri.getPort();
+        path = uri.getPath();
+        Map<String, Object> hdfsSettings = URISupport.parseParameters(uri);
+
+        overwrite = getBoolean(hdfsSettings, "overwrite", overwrite);
+        append = getBoolean(hdfsSettings, "append", append);
+        wantAppend = append;
+        bufferSize = getInteger(hdfsSettings, "bufferSize", bufferSize);
+        replication = getShort(hdfsSettings, "replication", replication);
+        blockSize = getLong(hdfsSettings, "blockSize", blockSize);
+        compressionType = getCompressionType(hdfsSettings, "compressionType", compressionType);
+        compressionCodec = getCompressionCodec(hdfsSettings, "compressionCodec", compressionCodec);
+        fileType = getFileType(hdfsSettings, "fileType", fileType);
+        fileSystemType = getFileSystemType(hdfsSettings, "fileSystemType", fileSystemType);
+        keyType = getWritableType(hdfsSettings, "keyType", keyType);
+        valueType = getWritableType(hdfsSettings, "valueType", valueType);
+        openedSuffix = getString(hdfsSettings, "openedSuffix", openedSuffix);
+        readSuffix = getString(hdfsSettings, "readSuffix", readSuffix);
+        initialDelay = getLong(hdfsSettings, "initialDelay", initialDelay);
+        delay = getLong(hdfsSettings, "delay", delay);
+        pattern = getString(hdfsSettings, "pattern", pattern);
+        chunkSize = getInteger(hdfsSettings, "chunkSize", chunkSize);
+        splitStrategies = getSplitStrategies(hdfsSettings);
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public void setHostName(String hostName) {
+        this.hostName = hostName;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public boolean isOverwrite() {
+        return overwrite;
+    }
+
+    public void setOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+    }
+
+    public boolean isAppend() {
+        return append;
+    }
+
+    public boolean isWantAppend() {
+        return wantAppend;
+    }
+
+    public void setAppend(boolean append) {
+        this.append = append;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public short getReplication() {
+        return replication;
+    }
+
+    public void setReplication(short replication) {
+        this.replication = replication;
+    }
+
+    public long getBlockSize() {
+        return blockSize;
+    }
+
+    public void setBlockSize(long blockSize) {
+        this.blockSize = blockSize;
+    }
+
+    public HdfsFileType getFileType() {
+        return fileType;
+    }
+
+    public void setFileType(HdfsFileType fileType) {
+        this.fileType = fileType;
+    }
+
+    public SequenceFile.CompressionType getCompressionType() {
+        return compressionType;
+    }
+
+    public void setCompressionType(SequenceFile.CompressionType compressionType) {
+        this.compressionType = compressionType;
+    }
+
+    public HdfsCompressionCodec getCompressionCodec() {
+        return compressionCodec;
+    }
+
+    public void setCompressionCodec(HdfsCompressionCodec compressionCodec) {
+        this.compressionCodec = compressionCodec;
+    }
+
+    public void setFileSystemType(HdfsFileSystemType fileSystemType) {
+        this.fileSystemType = fileSystemType;
+    }
+
+    public HdfsFileSystemType getFileSystemType() {
+        return fileSystemType;
+    }
+
+    public HdfsWritableFactories.WritableType getKeyType() {
+        return keyType;
+    }
+
+    public void setKeyType(HdfsWritableFactories.WritableType keyType) {
+        this.keyType = keyType;
+    }
+
+    public HdfsWritableFactories.WritableType getValueType() {
+        return valueType;
+    }
+
+    public void setValueType(HdfsWritableFactories.WritableType valueType) {
+        this.valueType = valueType;
+    }
+
+    public void setOpenedSuffix(String openedSuffix) {
+        this.openedSuffix = openedSuffix;
+    }
+
+    public String getOpenedSuffix() {
+        return openedSuffix;
+    }
+
+    public void setReadSuffix(String readSuffix) {
+        this.readSuffix = readSuffix;
+    }
+
+    public String getReadSuffix() {
+        return readSuffix;
+    }
+
+    public void setInitialDelay(long initialDelay) {
+        this.initialDelay = initialDelay;
+    }
+
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    public String getPattern() {
+        return pattern;
+    }
+
+    public void setChunkSize(int chunkSize) {
+        this.chunkSize = chunkSize;
+    }
+
+    public int getChunkSize() {
+        return chunkSize;
+    }
+
+    public void setCheckIdleInterval(int checkIdleInterval) {
+        this.checkIdleInterval = checkIdleInterval;
+    }
+
+    public int getCheckIdleInterval() {
+        return checkIdleInterval;
+    }
+
+    public List<HdfsProducer.SplitStrategy> getSplitStrategies() {
+        return splitStrategies;
+    }
+
+    public void setSplitStrategy(String splitStrategy) {
+        // noop
+    }
+
+    public boolean isConnectOnStartup() {
+        return connectOnStartup;
+    }
+
+    public void setConnectOnStartup(boolean connectOnStartup) {
+        this.connectOnStartup = connectOnStartup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConstants.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConstants.java
new file mode 100644
index 0000000..bfd26f1
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConstants.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import org.apache.hadoop.io.SequenceFile;
+
+public final class HdfsConstants {
+
+    public static final int DEFAULT_PORT = 8020;
+
+    public static final int DEFAULT_BUFFERSIZE = 4096;
+
+    public static final short DEFAULT_REPLICATION = 3;
+
+    public static final long DEFAULT_BLOCKSIZE = 64 * 1024 * 1024L;
+
+    public static final SequenceFile.CompressionType DEFAULT_COMPRESSIONTYPE = SequenceFile.CompressionType.NONE;
+
+    public static final HdfsCompressionCodec DEFAULT_CODEC = HdfsCompressionCodec.DEFAULT;
+
+    public static final String DEFAULT_OPENED_SUFFIX = "opened";
+
+    public static final String DEFAULT_READ_SUFFIX = "read";
+
+    public static final String DEFAULT_SEGMENT_PREFIX = "seg";
+
+    public static final long DEFAULT_DELAY = 1000L;
+
+    public static final String DEFAULT_PATTERN = "*";
+
+    public static final int DEFAULT_CHECK_IDLE_INTERVAL = 500;
+
+    public static final String HDFS_CLOSE = "CamelHdfsClose";
+
+    private HdfsConstants() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
new file mode 100644
index 0000000..a437069
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.security.auth.login.Configuration;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public final class HdfsConsumer extends ScheduledPollConsumer {
+
+    private final HdfsConfiguration config;
+    private final StringBuilder hdfsPath;
+    private final Processor processor;
+    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
+    private volatile HdfsInputStream istream;
+
+    public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
+        super(endpoint, processor);
+        this.config = config;
+        this.hdfsPath = config.getFileSystemType().getHdfsPath(config);
+        this.processor = processor;
+
+        setInitialDelay(config.getInitialDelay());
+        setDelay(config.getDelay());
+        setUseFixedDelay(true);
+    }
+
+    @Override
+    public HdfsEndpoint getEndpoint() {
+        return (HdfsEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (config.isConnectOnStartup()) {
+            // setup hdfs if configured to do on startup
+            setupHdfs(true);
+        }
+    }
+
+    private HdfsInfo setupHdfs(boolean onStartup) throws Exception {
+        // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
+        if (onStartup) {
+            log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+            }
+        }
+
+        // hadoop will cache the connection by default so its faster to get in the poll method
+        HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString());
+
+        if (onStartup) {
+            log.info("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
+        Configuration auth = HdfsComponent.getJAASConfiguration();
+        try {
+            return doPoll();
+        } finally {
+            HdfsComponent.setJAASConfiguration(auth);
+        }
+    }
+
+    protected int doPoll() throws Exception {
+        class ExcludePathFilter implements PathFilter {
+            public boolean accept(Path path) {
+                return !(path.toString().endsWith(config.getOpenedSuffix()) || path.toString().endsWith(config.getReadSuffix()));
+            }
+        }
+
+        int numMessages = 0;
+
+        HdfsInfo info = setupHdfs(false);
+        FileStatus fileStatuses[];
+        if (info.getFileSystem().isFile(info.getPath())) {
+            fileStatuses = info.getFileSystem().globStatus(info.getPath());
+        } else {
+            Path pattern = info.getPath().suffix("/" + this.config.getPattern());
+            fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
+        }
+
+        for (FileStatus status : fileStatuses) {
+            if (normalFileIsDirectoryNoSuccessFile(status, info)) {
+                continue;
+            }
+            try {
+                this.rwlock.writeLock().lock();
+                this.istream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
+            } finally {
+                this.rwlock.writeLock().unlock();
+            }
+
+            try {
+                Holder<Object> key = new Holder<Object>();
+                Holder<Object> value = new Holder<Object>();
+                while (this.istream.next(key, value) != 0) {
+                    Exchange exchange = this.getEndpoint().createExchange();
+                    Message message = new DefaultMessage();
+                    String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/");
+                    message.setHeader(Exchange.FILE_NAME, fileName);
+                    if (key.value != null) {
+                        message.setHeader(HdfsHeader.KEY.name(), key.value);
+                    }
+                    message.setBody(value.value);
+                    exchange.setIn(message);
+
+                    log.debug("Processing file {}", fileName);
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        exchange.setException(e);
+                    }
+
+                    // in case of unhandled exceptions then let the exception handler handle them
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException(exchange.getException());
+                    }
+
+                    numMessages++;
+                }
+            } finally {
+                IOHelper.close(istream, "input stream", log);
+            }
+        }
+
+        return numMessages;
+    }
+
+    private boolean normalFileIsDirectoryNoSuccessFile(FileStatus status, HdfsInfo info) throws IOException {
+        if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && status.isDirectory()) {
+            Path successPath = new Path(status.getPath().toString() + "/_SUCCESS");
+            if (!info.getFileSystem().exists(successPath)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsEndpoint.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsEndpoint.java
new file mode 100644
index 0000000..87a3ac8
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsEndpoint.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+
+public class HdfsEndpoint extends DefaultEndpoint {
+
+    private final HdfsConfiguration config;
+
+    @SuppressWarnings("deprecation")
+    public HdfsEndpoint(String endpointUri, CamelContext context) throws URISyntaxException {
+        super(endpointUri, context);
+        this.config = new HdfsConfiguration();
+        this.config.parseURI(new URI(endpointUri));
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        config.checkConsumerOptions();
+        HdfsConsumer answer = new HdfsConsumer(this, processor, config);
+        configureConsumer(answer);
+        return answer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        config.checkProducerOptions();
+        return new HdfsProducer(this, config);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public HdfsConfiguration getConfig() {
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileSystemType.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileSystemType.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileSystemType.java
new file mode 100644
index 0000000..8746bd2
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileSystemType.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+public enum HdfsFileSystemType {
+
+    LOCAL {
+        @Override
+        public StringBuilder getHdfsPath(HdfsConfiguration config) {
+            StringBuilder hpath = new StringBuilder();
+            hpath.append("file://");
+            hpath.append(config.getPath());
+            if (config.getSplitStrategies().size() > 0) {
+                hpath.append('/');
+            }
+            return hpath;
+        }
+    },
+
+    HDFS {
+        @Override
+        public StringBuilder getHdfsPath(HdfsConfiguration config) {
+            StringBuilder hpath = new StringBuilder();
+            hpath.append("hdfs://");
+            hpath.append(config.getHostName());
+            hpath.append(':');
+            hpath.append(config.getPort());
+            hpath.append(config.getPath());
+            if (config.getSplitStrategies().size() > 0) {
+                hpath.append('/');
+            }
+            return hpath;
+        }
+    };
+
+    public abstract StringBuilder getHdfsPath(HdfsConfiguration conf);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
new file mode 100644
index 0000000..bb08440
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.util.IOHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayFile;
+import org.apache.hadoop.io.BloomMapFile;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public enum HdfsFileType {
+
+    NORMAL_FILE {
+        @Override
+        public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+            InputStream is = null;
+            try {
+                is = typeConverter.convertTo(InputStream.class, value);
+                return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            } finally {
+                IOHelper.close(is);
+            }
+        }
+
+        @Override
+        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+            try {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsistr.getChunkSize());
+                byte buf[] = new byte[HdfsConstants.DEFAULT_BUFFERSIZE];
+                int bytesRead = ((InputStream) hdfsistr.getIn()).read(buf);
+                if (bytesRead >= 0) {
+                    bos.write(buf, 0, bytesRead);
+                    key.value = null;
+                    value.value = bos;
+                    return bytesRead;
+                } else {
+                    key.value = null;
+                    value.value = null;
+                    return 0;
+                }
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rout;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                if (!configuration.isAppend()) {
+                    rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), configuration.isOverwrite(), configuration.getBufferSize(),
+                            configuration.getReplication(), configuration.getBlockSize(), new Progressable() {
+                                @Override
+                                public void progress() {
+                                }
+                            });
+                } else {
+                    rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), configuration.getBufferSize(), new Progressable() {
+                        @Override
+                        public void progress() {
+                        }
+                    });
+                }
+                return rout;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rin;
+                if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
+                    HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                    rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+                } else {
+                    rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration));
+                }
+                return rin;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
+
+                File outputDest = File.createTempFile(fname, ".hdfs");
+                if (outputDest.exists()) {
+                    outputDest.delete();
+                }
+
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                FileSystem fileSystem = hdfsInfo.getFileSystem();
+                FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf());
+                try {
+                    FileUtil.copyMerge(
+                            fileSystem, // src
+                            new Path(hdfsPath),
+                            FileSystem.getLocal(new Configuration()), // dest
+                            new Path(outputDest.toURI()),
+                            false, fileSystem.getConf(), null);
+                } catch (IOException e) {
+                    return outputDest;
+                }
+
+                return new File(outputDest, fname);
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+    },
+
+    SEQUENCE_FILE {
+        @Override
+        public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+            try {
+                Holder<Integer> keySize = new Holder<Integer>();
+                Writable keyWritable = getWritable(key, typeConverter, keySize);
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = getWritable(value, typeConverter, valueSize);
+                Writer writer = (SequenceFile.Writer) hdfsostr.getOut();
+                writer.append(keyWritable, valueWritable);
+                writer.sync();
+                return keySize.value + valueSize.value;
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+            try {
+                SequenceFile.Reader reader = (SequenceFile.Reader) hdfsistr.getIn();
+                Holder<Integer> keySize = new Holder<Integer>();
+                Writable keyWritable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+                if (reader.next(keyWritable, valueWritable)) {
+                    key.value = getObject(keyWritable, keySize);
+                    value.value = getObject(valueWritable, valueSize);
+                    return keySize.value + valueSize.value;
+                } else {
+                    return 0;
+                }
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rout;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                Class<?> keyWritableClass = configuration.getKeyType().getWritableClass();
+                Class<?> valueWritableClass = configuration.getValueType().getWritableClass();
+                rout = SequenceFile.createWriter(hdfsInfo.getConf(), Writer.file(hdfsInfo.getPath()), Writer.keyClass(keyWritableClass),
+                        Writer.valueClass(valueWritableClass), Writer.bufferSize(configuration.getBufferSize()),
+                        Writer.replication(configuration.getReplication()), Writer.blockSize(configuration.getBlockSize()),
+                        Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
+                        Writer.progressable(new Progressable() {
+                            @Override
+                            public void progress() {
+                            }
+                        }), Writer.metadata(new SequenceFile.Metadata()));
+                return rout;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rin;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                rin = new SequenceFile.Reader(hdfsInfo.getConf(), Reader.file(hdfsInfo.getPath()));
+                return rin;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+    },
+
+    MAP_FILE {
+        @Override
+        public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+            try {
+                Holder<Integer> keySize = new Holder<Integer>();
+                Writable keyWritable = getWritable(key, typeConverter, keySize);
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = getWritable(value, typeConverter, valueSize);
+                ((MapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
+                return keySize.value + valueSize.value;
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+            try {
+                MapFile.Reader reader = (MapFile.Reader) hdfsistr.getIn();
+                Holder<Integer> keySize = new Holder<Integer>();
+                WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+                if (reader.next(keyWritable, valueWritable)) {
+                    key.value = getObject(keyWritable, keySize);
+                    value.value = getObject(valueWritable, valueSize);
+                    return keySize.value + valueSize.value;
+                } else {
+                    return 0;
+                }
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rout;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
+                Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
+                rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+                    MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
+                    MapFile.Writer.progressable(new Progressable() {
+                        @Override
+                        public void progress() {
+                        }
+                    }));
+                return rout;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rin;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+                return rin;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+    },
+
+    BLOOMMAP_FILE {
+        @Override
+        public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+            try {
+                Holder<Integer> keySize = new Holder<Integer>();
+                Writable keyWritable = getWritable(key, typeConverter, keySize);
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = getWritable(value, typeConverter, valueSize);
+                ((BloomMapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
+                return keySize.value + valueSize.value;
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+            try {
+                MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn();
+                Holder<Integer> keySize = new Holder<Integer>();
+                WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+                if (reader.next(keyWritable, valueWritable)) {
+                    key.value = getObject(keyWritable, keySize);
+                    value.value = getObject(valueWritable, valueSize);
+                    return keySize.value + valueSize.value;
+                } else {
+                    return 0;
+                }
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rout;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
+                Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
+                rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), BloomMapFile.Writer.keyClass(keyWritableClass),
+                        BloomMapFile.Writer.valueClass(valueWritableClass),
+                        BloomMapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
+                        BloomMapFile.Writer.progressable(new Progressable() {
+                            @Override
+                            public void progress() {
+                            }
+                        }));
+                return rout;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rin;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+                return rin;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+    },
+
+    ARRAY_FILE {
+        @Override
+        public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+            try {
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = getWritable(value, typeConverter, valueSize);
+                ((ArrayFile.Writer) hdfsostr.getOut()).append(valueWritable);
+                return valueSize.value;
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+            try {
+                ArrayFile.Reader reader = (ArrayFile.Reader) hdfsistr.getIn();
+                Holder<Integer> valueSize = new Holder<Integer>();
+                Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+                if (reader.next(valueWritable) != null) {
+                    value.value = getObject(valueWritable, valueSize);
+                    return valueSize.value;
+                } else {
+                    return 0;
+                }
+            } catch (Exception ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rout;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
+                rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+                        configuration.getCompressionType(), new Progressable() {
+                            @Override
+                            public void progress() {
+                            }
+                        });
+                return rout;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+
+        @Override
+        public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+            try {
+                Closeable rin;
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf());
+                return rin;
+            } catch (IOException ex) {
+                throw new RuntimeCamelException(ex);
+            }
+        }
+    };
+
+    @SuppressWarnings({"rawtypes"})
+    private static final class WritableCache {
+
+        private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> writables = new HashMap<Class, HdfsWritableFactories.HdfsWritableFactory>();
+        private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> readables = new HashMap<Class, HdfsWritableFactories.HdfsWritableFactory>();
+
+        private WritableCache() {
+        }
+
+        static {
+            writables.put(Boolean.class, new HdfsWritableFactories.HdfsBooleanWritableFactory());
+            writables.put(Byte.class, new HdfsWritableFactories.HdfsByteWritableFactory());
+            writables.put(ByteBuffer.class, new HdfsWritableFactories.HdfsBytesWritableFactory());
+            writables.put(Double.class, new HdfsWritableFactories.HdfsDoubleWritableFactory());
+            writables.put(Float.class, new HdfsWritableFactories.HdfsFloatWritableFactory());
+            writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory());
+            writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory());
+            writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory());
+            writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory());
+        }
+
+        static {
+            readables.put(BooleanWritable.class, new HdfsWritableFactories.HdfsBooleanWritableFactory());
+            readables.put(ByteWritable.class, new HdfsWritableFactories.HdfsByteWritableFactory());
+            readables.put(BytesWritable.class, new HdfsWritableFactories.HdfsBytesWritableFactory());
+            readables.put(DoubleWritable.class, new HdfsWritableFactories.HdfsDoubleWritableFactory());
+            readables.put(FloatWritable.class, new HdfsWritableFactories.HdfsFloatWritableFactory());
+            readables.put(IntWritable.class, new HdfsWritableFactories.HdfsIntWritableFactory());
+            readables.put(LongWritable.class, new HdfsWritableFactories.HdfsLongWritableFactory());
+            readables.put(Text.class, new HdfsWritableFactories.HdfsTextWritableFactory());
+            readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory());
+        }
+    }
+
+    private static Writable getWritable(Object obj, TypeConverter typeConverter, Holder<Integer> size) {
+        Class<?> objCls = obj == null ? null : obj.getClass();
+        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
+        if (objWritableFactory == null) {
+            objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
+        }
+        return objWritableFactory.create(obj, typeConverter, size);
+    }
+
+    private static Object getObject(Writable writable, Holder<Integer> size) {
+        Class<?> writableClass = NullWritable.class;
+        if (writable != null) {
+            writableClass = writable.getClass();
+        }
+        HdfsWritableFactories.HdfsWritableFactory writableObjectFactory = WritableCache.readables.get(writableClass);
+        return writableObjectFactory.read(writable, size);
+    }
+
+    public abstract long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter);
+
+    public abstract long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value);
+
+    public abstract Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration);
+
+    public abstract Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration);
+
+    public static long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
+        long numBytes = 0;
+        @SuppressWarnings("resource")
+        PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
+        byte buf[] = new byte[buffSize];
+        try {
+            int bytesRead = in.read(buf);
+            while (bytesRead >= 0) {
+                out.write(buf, 0, bytesRead);
+                numBytes += bytesRead;
+                if ((ps != null) && ps.checkError()) {
+                    throw new IOException("Unable to write to output stream.");
+                }
+                bytesRead = in.read(buf);
+            }
+        } finally {
+            if (close) {
+                IOHelper.close(out, in);
+            }
+        }
+        return numBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsHeader.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsHeader.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsHeader.java
new file mode 100644
index 0000000..936c08d
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsHeader.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+public enum HdfsHeader {
+
+    KEY
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfo.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfo.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfo.java
new file mode 100644
index 0000000..5fb3fd4
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfo.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public final class HdfsInfo {
+
+    private Configuration conf;
+    private FileSystem fileSystem;
+    private Path path;
+
+    HdfsInfo(String hdfsPath) throws IOException {
+        this.conf = new Configuration();
+        // this will connect to the hadoop hdfs file system, and in case of no connection
+        // then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
+        this.fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
+        this.path = new Path(hdfsPath);
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+
+    public FileSystem getFileSystem() {
+        return fileSystem;
+    }
+
+    public Path getPath() {
+        return path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfoFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfoFactory.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfoFactory.java
new file mode 100644
index 0000000..6f44296
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInfoFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.io.IOException;
+import javax.security.auth.login.Configuration;
+
+public final class HdfsInfoFactory {
+
+    private HdfsInfoFactory() {
+    }
+
+    public static HdfsInfo newHdfsInfo(String hdfsPath) throws IOException {
+        // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
+        Configuration auth = HdfsComponent.getJAASConfiguration();
+        try {
+            return new HdfsInfo(hdfsPath);
+        } finally {
+            HdfsComponent.setJAASConfiguration(auth);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
new file mode 100644
index 0000000..951e51e
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+public class HdfsInputStream implements Closeable {
+
+    private HdfsFileType fileType;
+    private String actualPath;
+    private String suffixedPath;
+    private Closeable in;
+    private boolean opened;
+    private int chunkSize;
+    private final AtomicLong numOfReadBytes = new AtomicLong(0L);
+    private final AtomicLong numOfReadMessages = new AtomicLong(0L);
+
+    protected HdfsInputStream() {
+    }
+
+    public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException {
+        HdfsInputStream ret = new HdfsInputStream();
+        ret.fileType = configuration.getFileType();
+        ret.actualPath = hdfsPath;
+        ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
+        ret.chunkSize = configuration.getChunkSize();
+        HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
+        info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
+        ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
+        ret.opened = true;
+        return ret;
+    }
+
+    @Override
+    public final void close() throws IOException {
+        if (opened) {
+            IOUtils.closeStream(in);
+            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
+            info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX));
+            opened = false;
+        }
+    }
+
+    public final long next(Holder<Object> key, Holder<Object> value) {
+        long nb = fileType.next(this, key, value);
+        if (nb > 0) {
+            numOfReadBytes.addAndGet(nb);
+            numOfReadMessages.incrementAndGet();
+        }
+        return nb;
+    }
+
+    public final long getNumOfReadBytes() {
+        return numOfReadBytes.longValue();
+    }
+
+    public final long getNumOfReadMessages() {
+        return numOfReadMessages.longValue();
+    }
+
+    public final String getActualPath() {
+        return actualPath;
+    }
+
+    public final int getChunkSize() {
+        return chunkSize;
+    }
+
+    public final Closeable getIn() {
+        return in;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/af7661ab/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsOutputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsOutputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsOutputStream.java
new file mode 100644
index 0000000..568108e
--- /dev/null
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsOutputStream.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs2;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+public class HdfsOutputStream implements Closeable {
+
+    private HdfsFileType fileType;
+    private HdfsInfo info;
+    private String actualPath;
+    private String suffixedPath;
+    private Closeable out;
+    private volatile boolean opened;
+    private final AtomicLong numOfWrittenBytes = new AtomicLong(0L);
+    private final AtomicLong numOfWrittenMessages = new AtomicLong(0L);
+    private final AtomicLong lastAccess = new AtomicLong(Long.MAX_VALUE);
+    private final AtomicBoolean busy = new AtomicBoolean(false);
+
+    protected HdfsOutputStream() {
+    }
+
+    public static HdfsOutputStream createOutputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException {
+        HdfsOutputStream ret = new HdfsOutputStream();
+        ret.fileType = configuration.getFileType();
+        ret.actualPath = hdfsPath;
+        ret.info = new HdfsInfo(ret.actualPath);
+
+        ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
+        if (configuration.isWantAppend() || configuration.isAppend()) {
+            if (!ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
+                configuration.setAppend(false);
+            } else {
+                configuration.setAppend(true);
+                ret.info = new HdfsInfo(ret.suffixedPath);
+                ret.info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
+            }
+        } else {
+            if (ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
+                if (configuration.isOverwrite()) {
+                    ret.info.getFileSystem().delete(new Path(ret.actualPath), true);
+                } else {
+                    throw new RuntimeCamelException("The file already exists");
+                }
+            }
+        }
+        ret.out = ret.fileType.createOutputStream(ret.suffixedPath, configuration);
+        ret.opened = true;
+        return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (opened) {
+            IOUtils.closeStream(out);
+            info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath));
+            opened = false;
+        }
+    }
+
+    public void append(Object key, Object value, TypeConverter typeConverter) {
+        try {
+            busy.set(true);
+            long nb = fileType.append(this, key, value, typeConverter);
+            numOfWrittenBytes.addAndGet(nb);
+            numOfWrittenMessages.incrementAndGet();
+            lastAccess.set(System.currentTimeMillis());
+        } finally {
+            busy.set(false);
+        }
+    }
+
+    public long getNumOfWrittenBytes() {
+        return numOfWrittenBytes.longValue();
+    }
+
+    public long getNumOfWrittenMessages() {
+        return numOfWrittenMessages.longValue();
+    }
+
+    public long getLastAccess() {
+        return lastAccess.longValue();
+    }
+
+    public String getActualPath() {
+        return actualPath;
+    }
+
+    public AtomicBoolean isBusy() {
+        return busy;
+    }
+
+    public Closeable getOut() {
+        return out;
+    }
+}