You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/30 04:31:47 UTC

[camel] branch master updated: CAMEL-14076 - camel-hdfs - Make the HdfsProducer compatible with RemoteFileConsumer (from(hdfs) -> to(sftp)) (#3285)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 46e93a0  CAMEL-14076 - camel-hdfs - Make the HdfsProducer compatible with RemoteFileConsumer (from(hdfs) -> to(sftp)) (#3285)
46e93a0 is described below

commit 46e93a01be9adfc564cb4896521e8d0723ac5224
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Wed Oct 30 05:31:37 2019 +0100

    CAMEL-14076 - camel-hdfs - Make the HdfsProducer compatible with RemoteFileConsumer (from(hdfs) -> to(sftp)) (#3285)
    
    CAMEL-14076 : camel-hdfs - Make the HdfsProducer compatible with RemoteFileConsumer (from(hdfs) -> to(sftp))
---
 .../camel-hdfs/src/main/docs/hdfs-component.adoc   |   3 +-
 ...faultHdfsFileType.java => DefaultHdfsFile.java} |   2 +-
 ...FileType.java => HdfsArrayFileTypeHandler.java} |  23 +-
 ...pFileType.java => HdfsBloommapFileHandler.java} |  21 +-
 .../camel/component/hdfs/HdfsConfiguration.java    |  24 +-
 .../apache/camel/component/hdfs/HdfsConsumer.java  |  92 +--
 .../org/apache/camel/component/hdfs/HdfsFile.java  |   8 +-
 .../camel/component/hdfs/HdfsFileSystemType.java   |   5 +-
 .../apache/camel/component/hdfs/HdfsFileType.java  |  28 +-
 .../org/apache/camel/component/hdfs/HdfsInfo.java  |   2 +-
 .../camel/component/hdfs/HdfsInfoFactory.java      |  24 +-
 .../camel/component/hdfs/HdfsInputStream.java      |  22 +-
 ...dfsMapFileType.java => HdfsMapFileHandler.java} |  21 +-
 ...malFileType.java => HdfsNormalFileHandler.java} |  89 +--
 .../camel/component/hdfs/HdfsOutputStream.java     |  23 +-
 .../apache/camel/component/hdfs/HdfsProducer.java  |  10 +-
 ...eFileType.java => HdfsSequenceFileHandler.java} |  25 +-
 .../camel/component/hdfs/HdfsConsumerTest.java     | 642 ++++-----------------
 .../apache/camel/component/hdfs/HdfsInfoTest.java  |   2 +-
 .../camel/component/hdfs/HdfsInputStreamTest.java  | 117 ++++
 .../camel/component/hdfs/HdfsOutputStreamTest.java | 173 ++++++
 .../camel/component/hdfs/MockDataInputStream.java  | 124 ++++
 .../HdfsConsumerIntegrationTest.java}              |   5 +-
 .../src/test/resources/hdfs/normal_file.txt        |  28 +
 .../endpoint/dsl/HdfsEndpointBuilderFactory.java   |  29 +
 25 files changed, 838 insertions(+), 704 deletions(-)

diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
index 130f4e0..715b99d 100644
--- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc
+++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
@@ -93,7 +93,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (45 parameters):
+=== Query Parameters (46 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -112,6 +112,7 @@ with the following path and query parameters:
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *pattern* (consumer) | The pattern used for scanning the directory | * | String
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean
+| *streamDownload* (consumer) | Sets the download method to use when not using a local working directory. If set to true, the remote files are streamed to the route as they are read. When set to false, the remote files are loaded into memory before being sent into the route. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |  | PollingConsumerPollStrategy
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
similarity index 98%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index 4fdcbeb..76942c8 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
-abstract class DefaultHdfsFileType implements HdfsFile {
+abstract class DefaultHdfsFile implements HdfsFile {
 
     protected final long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
         long numBytes = 0;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
similarity index 73%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 8b5aa8d..35bb5a9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -27,14 +27,14 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsArrayFileType extends DefaultHdfsFileType {
+class HdfsArrayFileTypeHandler extends DefaultHdfsFile {
 
     @Override
-    public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
         try {
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, typeConverter, valueSize);
-            ((ArrayFile.Writer) hdfsostr.getOut()).append(valueWritable);
+            ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
             return valueSize.value;
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
@@ -42,9 +42,9 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
     }
 
     @Override
-    public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         try {
-            ArrayFile.Reader reader = (ArrayFile.Reader) hdfsistr.getIn();
+            ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn();
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(valueWritable) != null) {
@@ -60,13 +60,14 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
 
     @SuppressWarnings("rawtypes")
     @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rout;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
             rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
-                    configuration.getCompressionType(), () -> { });
+                    endpointConfig.getCompressionType(), () -> { });
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -74,10 +75,10 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
     }
 
     @Override
-    public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rin;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
             return rin;
         } catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
similarity index 79%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
index fa0c0ab..422e4e6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
@@ -29,16 +29,16 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsBloommapFileType extends DefaultHdfsFileType {
+class HdfsBloommapFileHandler extends DefaultHdfsFile {
 
     @Override
-    public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
         try {
             Holder<Integer> keySize = new Holder<>();
             Writable keyWritable = getWritable(key, typeConverter, keySize);
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, typeConverter, valueSize);
-            ((BloomMapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
+            ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
             return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
@@ -67,15 +67,16 @@ class HdfsBloommapFileType extends DefaultHdfsFileType {
 
     @SuppressWarnings("rawtypes")
     @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rout;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
-            Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
             rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
                     MapFile.Writer.valueClass(valueWritableClass),
-                    MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
+                    MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
                     MapFile.Writer.progressable(() -> {
                     }));
             return rout;
@@ -85,10 +86,10 @@ class HdfsBloommapFileType extends DefaultHdfsFileType {
     }
 
     @Override
-    public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rin;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
             return rin;
         } catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index c5d3dc4..8b8dcea 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -86,6 +86,9 @@ public class HdfsConfiguration {
     @UriParam
     private String owner;
 
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean streamDownload = false;
+
     @UriParam
     private String namedNodes;
     private List<String> namedNodeList = Collections.emptyList();
@@ -562,7 +565,7 @@ public class HdfsConfiguration {
     }
 
     public boolean hasClusterConfiguration() {
-        return !getNamedNodeList().isEmpty();
+        return !namedNodeList.isEmpty();
     }
 
     public String getKerberosConfigFileLocation() {
@@ -603,6 +606,19 @@ public class HdfsConfiguration {
         return isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation);
     }
 
+    public boolean isStreamDownload() {
+        return streamDownload;
+    }
+
+    /**
+     * Sets the download method to use when not using a local working directory.  If set to true,
+     * the remote files are streamed to the route as they are read.  When set to false, the remote files
+     * are loaded into memory before being sent into the route.
+     */
+    public void setStreamDownload(boolean streamDownload) {
+        this.streamDownload = streamDownload;
+    }
+
     /**
      * Get the label of the hdfs file system like: HOST_NAME:PORT/PATH
      *
@@ -610,7 +626,11 @@ public class HdfsConfiguration {
      * @return HOST_NAME:PORT/PATH
      */
     String getFileSystemLabel(String path) {
-        return String.format("%s:%s/%s", getHostName(), getPort(), path);
+        if(hasClusterConfiguration()) {
+            return String.format("%s/%s", getHostName(), path);
+        } else {
+            return String.format("%s:%s/%s", getHostName(), getPort(), path);
+        }
     }
 
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 52b1249..7b3f4e4 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -38,16 +38,22 @@ import org.apache.hadoop.fs.PathFilter;
 
 public final class HdfsConsumer extends ScheduledPollConsumer {
 
-    private final HdfsConfiguration config;
+    private final HdfsConfiguration endpointConfig;
     private final StringBuilder hdfsPath;
     private final Processor processor;
+    private final HdfsInfoFactory hdfsInfoFactory;
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
-    public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
+    public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration endpointConfig) {
+        this(endpoint, processor, endpointConfig, new HdfsInfoFactory(endpointConfig), endpointConfig.getFileSystemType().getHdfsPath(endpointConfig));
+    }
+
+    HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration endpointConfig, HdfsInfoFactory hdfsInfoFactory, StringBuilder hdfsPath) {
         super(endpoint, processor);
-        this.config = config;
-        this.hdfsPath = config.getFileSystemType().getHdfsPath(config);
         this.processor = processor;
+        this.endpointConfig = endpointConfig;
+        this.hdfsPath = hdfsPath;
+        this.hdfsInfoFactory = hdfsInfoFactory;
         setUseFixedDelay(true);
     }
 
@@ -60,14 +66,14 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        if (config.isConnectOnStartup()) {
+        if (endpointConfig.isConnectOnStartup()) {
             // setup hdfs if configured to do on startup
             setupHdfs(true);
         }
     }
 
     private HdfsInfo setupHdfs(boolean onStartup) throws IOException {
-        String hdfsFsDescription = config.getFileSystemLabel(hdfsPath.toString());
+        String hdfsFsDescription = endpointConfig.getFileSystemLabel(hdfsPath.toString());
         // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
         if (onStartup) {
             log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
@@ -76,7 +82,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         }
 
         // hadoop will cache the connection by default so its faster to get in the poll method
-        HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString(), config);
+        HdfsInfo answer = hdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString());
 
         if (onStartup) {
             log.info("Connected to hdfs file-system {}", hdfsFsDescription);
@@ -101,7 +107,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         class ExcludePathFilter implements PathFilter {
             @Override
             public boolean accept(Path path) {
-                return !(path.toString().endsWith(config.getOpenedSuffix()) || path.toString().endsWith(config.getReadSuffix()));
+                return !(path.toString().endsWith(endpointConfig.getOpenedSuffix()) || path.toString().endsWith(endpointConfig.getReadSuffix()));
             }
         }
 
@@ -110,7 +116,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         if (info.getFileSystem().isFile(info.getPath())) {
             fileStatuses = info.getFileSystem().globStatus(info.getPath());
         } else {
-            Path pattern = info.getPath().suffix("/" + this.config.getPattern());
+            Path pattern = info.getPath().suffix("/" + this.endpointConfig.getPattern());
             fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
         }
 
@@ -141,35 +147,54 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) {
         Holder<Object> key = new Holder<>();
         Holder<Object> value = new Holder<>();
-        while (inputStream.next(key, value) >= 0) {
-            Exchange exchange = this.getEndpoint().createExchange();
-            Message message = exchange.getIn();
-            String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
-            message.setHeader(Exchange.FILE_NAME, fileName);
-            if (key.value != null) {
-                message.setHeader(HdfsHeader.KEY.name(), key.value);
-            }
-            message.setBody(value.value);
 
-            log.debug("Processing file {}", fileName);
-            try {
-                processor.process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
+        if (this.endpointConfig.isStreamDownload()) {
+            key.value = null;
+            value.value = inputStream;
+            // use the input stream as the body
+            processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
+        } else {
+            while (inputStream.next(key, value) >= 0) {
+                processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
             }
+        }
+    }
 
-            // in case of unhandled exceptions then let the exception handler handle them
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException(exchange.getException());
-            }
+    private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) {
+        Exchange exchange = this.getEndpoint().createExchange();
+        Message message = exchange.getIn();
+        String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
+        message.setHeader(Exchange.FILE_NAME, fileName);
+        message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
+        message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath());
+        if (key.value != null) {
+            message.setHeader(HdfsHeader.KEY.name(), key.value);
+        }
 
-            int count = messageCount.incrementAndGet();
-            log.debug("Processed [{}] files out of [{}]", count, totalFiles);
+        if (inputStream.getNumOfReadBytes() >= 0) {
+            message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes());
         }
+
+        message.setBody(value.value);
+
+        log.debug("Processing file {}", fileName);
+        try {
+            processor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        // in case of unhandled exceptions then let the exception handler handle them
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException(exchange.getException());
+        }
+
+        int count = messageCount.incrementAndGet();
+        log.debug("Processed [{}] files out of [{}]", count, totalFiles);
     }
 
     private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) {
-        if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && fileStatus.isDirectory()) {
+        if (endpointConfig.getFileType().equals(HdfsFileType.NORMAL_FILE) && fileStatus.isDirectory()) {
             try {
                 Path successPath = new Path(fileStatus.getPath().toString() + "/_SUCCESS");
                 if (!info.getFileSystem().exists(successPath)) {
@@ -183,9 +208,9 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     }
 
     private boolean hasMatchingOwner(FileStatus fileStatus) {
-        if (config.getOwner() != null && !config.getOwner().equals(fileStatus.getOwner())) {
+        if (endpointConfig.getOwner() != null && !endpointConfig.getOwner().equals(fileStatus.getOwner())) {
             if (log.isDebugEnabled()) {
-                log.debug("Skipping file: {} as not matching owner: {}", fileStatus.getPath(), config.getOwner());
+                log.debug("Skipping file: {} as not matching owner: {}", fileStatus.getPath(), endpointConfig.getOwner());
             }
             return false;
         }
@@ -195,8 +220,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     private HdfsInputStream createInputStream(FileStatus fileStatus) {
         try {
             this.rwLock.writeLock().lock();
-
-            return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.config);
+            return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory);
         } finally {
             this.rwLock.writeLock().unlock();
         }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
index f08cb18..1e2d63f 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
@@ -22,12 +22,12 @@ import org.apache.camel.TypeConverter;
 
 interface HdfsFile {
 
-    long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter);
+    Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
 
-    long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value);
+    long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter);
 
-    Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration);
+    Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
 
-    Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration);
+    long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value);
 
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
index 744fdab..75ec6a1 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
@@ -37,8 +37,9 @@ public enum HdfsFileSystemType {
             StringBuilder hpath = new StringBuilder();
             hpath.append("hdfs://");
             hpath.append(config.getHostName());
-            hpath.append(':');
-            hpath.append(config.getPort());
+            if(!config.hasClusterConfiguration()) {
+                hpath.append(':').append(config.getPort());
+            }
             hpath.append(config.getPath());
             if (config.hasSplitStrategies()) {
                 hpath.append('/');
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index fe18006..7d4a239 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -22,32 +22,32 @@ import org.apache.camel.TypeConverter;
 
 public enum HdfsFileType {
 
-    NORMAL_FILE(new HdfsNormalFileType()),
-    SEQUENCE_FILE(new HdfsSequenceFileType()),
-    MAP_FILE(new HdfsMapFileType()),
-    BLOOMMAP_FILE(new HdfsBloommapFileType()),
-    ARRAY_FILE(new HdfsArrayFileType());
+    NORMAL_FILE(new HdfsNormalFileHandler()),
+    SEQUENCE_FILE(new HdfsSequenceFileHandler()),
+    MAP_FILE(new HdfsMapFileHandler()),
+    BLOOMMAP_FILE(new HdfsBloommapFileHandler()),
+    ARRAY_FILE(new HdfsArrayFileTypeHandler());
 
     private final HdfsFile file;
 
-    private HdfsFileType(HdfsFile file) {
+    HdfsFileType(HdfsFile file) {
         this.file = file;
     }
 
-    public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
-        return this.file.append(hdfsostr, key, value, typeConverter);
+    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        return this.file.createOutputStream(hdfsPath, hdfsInfoFactory);
     }
 
-    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
-        return this.file.next(hdfsInputStream, key, value);
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+        return this.file.append(hdfsOutputStream, key, value, typeConverter);
     }
 
-    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
-        return this.file.createOutputStream(hdfsPath, configuration);
+    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        return this.file.createInputStream(hdfsPath, hdfsInfoFactory);
     }
 
-    public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
-        return this.file.createInputStream(hdfsPath, configuration);
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+        return this.file.next(hdfsInputStream, key, value);
     }
 
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
index ff8035e..556ba9e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
@@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-final class HdfsInfo {
+class HdfsInfo {
 
     private final Configuration configuration;
     private final FileSystem fileSystem;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
index 6bb47da..3f2f619 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
@@ -25,13 +25,27 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-public final class HdfsInfoFactory {
+class HdfsInfoFactory {
 
-    private HdfsInfoFactory() {
-        // hidden
+    private final HdfsConfiguration endpointConfig;
+
+    HdfsInfoFactory(HdfsConfiguration endpointConfig) {
+        this.endpointConfig = endpointConfig;
+    }
+
+    HdfsInfo newHdfsInfo(String hdfsPath) throws IOException {
+        return newHdfsInfo(hdfsPath, endpointConfig);
+    }
+
+    HdfsInfo newHdfsInfoWithoutAuth(String hdfsPath) throws IOException {
+        return newHdfsInfoWithoutAuth(hdfsPath, endpointConfig);
+    }
+
+    HdfsConfiguration getEndpointConfig() {
+        return endpointConfig;
     }
 
-    static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+    private static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
         // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
         javax.security.auth.login.Configuration auth = HdfsComponent.getJAASConfiguration();
         try {
@@ -41,7 +55,7 @@ public final class HdfsInfoFactory {
         }
     }
 
-    static HdfsInfo newHdfsInfoWithoutAuth(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+    private static HdfsInfo newHdfsInfoWithoutAuth(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
         Configuration configuration = newConfiguration(endpointConfig);
 
         authenticate(configuration, endpointConfig);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 49e3ddc..8f12aef 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -47,23 +47,24 @@ public class HdfsInputStream implements Closeable {
     /**
      *
      * @param hdfsPath
-     * @param configuration
+     * @param hdfsInfoFactory
      * @return
      * @throws IOException
      */
-    public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public static HdfsInputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
         HdfsInputStream iStream = new HdfsInputStream();
-        iStream.fileType = configuration.getFileType();
+        iStream.fileType = endpointConfig.getFileType();
         iStream.actualPath = hdfsPath;
-        iStream.suffixedPath = iStream.actualPath + '.' + configuration.getOpenedSuffix();
-        iStream.suffixedReadPath = iStream.actualPath + '.' + configuration.getReadSuffix();
-        iStream.chunkSize = configuration.getChunkSize();
+        iStream.suffixedPath = iStream.actualPath + '.' + endpointConfig.getOpenedSuffix();
+        iStream.suffixedReadPath = iStream.actualPath + '.' + endpointConfig.getReadSuffix();
+        iStream.chunkSize = endpointConfig.getChunkSize();
         try {
-            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(iStream.actualPath, configuration);
+            HdfsInfo info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath);
             if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
-                iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, configuration);
+                iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, hdfsInfoFactory);
                 iStream.opened = true;
-                iStream.config = configuration;
+                iStream.config = endpointConfig;
             } else {
                 LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
                 iStream = null;
@@ -79,7 +80,8 @@ public class HdfsInputStream implements Closeable {
     public final void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(in);
-            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath, config);
+            HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(config);
+            HdfsInfo info = hdfsInfoFactory.newHdfsInfo(actualPath);
             info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
             opened = false;
         }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
similarity index 79%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index fbd3e04..1c19162 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -28,16 +28,16 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsMapFileType extends DefaultHdfsFileType {
+class HdfsMapFileHandler extends DefaultHdfsFile {
 
     @Override
-    public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
         try {
             Holder<Integer> keySize = new Holder<>();
             Writable keyWritable = getWritable(key, typeConverter, keySize);
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, typeConverter, valueSize);
-            ((MapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
+            ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
             return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
@@ -66,14 +66,15 @@ class HdfsMapFileType extends DefaultHdfsFileType {
 
     @Override
     @SuppressWarnings("rawtypes")
-    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rout;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
-            Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
             rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
-                    MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
+                    MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
                     MapFile.Writer.progressable(() -> {
                     }));
             return rout;
@@ -83,10 +84,10 @@ class HdfsMapFileType extends DefaultHdfsFileType {
     }
 
     @Override
-    public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rin;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
             return rin;
         } catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
similarity index 70%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index 43df23a..952bde8 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -33,14 +33,42 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
-class HdfsNormalFileType extends DefaultHdfsFileType {
+class HdfsNormalFileHandler extends DefaultHdfsFile {
 
     @Override
-    public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            FSDataOutputStream rout;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            if (endpointConfig.isAppend()) {
+                rout = hdfsInfo.getFileSystem().append(
+                        hdfsInfo.getPath(),
+                        endpointConfig.getBufferSize(),
+                    () -> { }
+                );
+            } else {
+                rout = hdfsInfo.getFileSystem().create(
+                        hdfsInfo.getPath(),
+                        endpointConfig.isOverwrite(),
+                        endpointConfig.getBufferSize(),
+                        endpointConfig.getReplication(),
+                        endpointConfig.getBlockSize(),
+                    () -> { }
+                );
+            }
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
         InputStream is = null;
         try {
             is = typeConverter.convertTo(InputStream.class, value);
-            return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
+            return copyBytes(is, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         } finally {
@@ -49,6 +77,23 @@ class HdfsNormalFileType extends DefaultHdfsFileType {
     }
 
     @Override
+    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            Closeable rin;
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            if (endpointConfig.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
+                HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+                rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+            } else {
+                rin = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig));
+            }
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
     public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize());
@@ -70,40 +115,7 @@ class HdfsNormalFileType extends DefaultHdfsFileType {
         }
     }
 
-    @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
-        try {
-            Closeable rout;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            if (!configuration.isAppend()) {
-                rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), configuration.isOverwrite(), configuration.getBufferSize(),
-                        configuration.getReplication(), configuration.getBlockSize(), () -> { });
-            } else {
-                rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), configuration.getBufferSize(), () -> { });
-            }
-            return rout;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
-
-    @Override
-    public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
-        try {
-            Closeable rin;
-            if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-                rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
-            } else {
-                rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration));
-            }
-            return rin;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
-
-    private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
+    private File getHdfsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
         try {
             String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
 
@@ -123,7 +135,8 @@ class HdfsNormalFileType extends DefaultHdfsFileType {
                 outputDest.delete();
             }
 
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
+            HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(configuration);
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             FileSystem fileSystem = hdfsInfo.getFileSystem();
             FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf());
             try {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
index 49a3a00..b93b93e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
@@ -42,34 +42,35 @@ public class HdfsOutputStream implements Closeable {
     protected HdfsOutputStream() {
     }
 
-    public static HdfsOutputStream createOutputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException {
+    public static HdfsOutputStream createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) throws IOException {
+        HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
         HdfsOutputStream oStream = new HdfsOutputStream();
-        oStream.fileType = configuration.getFileType();
+        oStream.fileType = endpointConfig.getFileType();
         oStream.actualPath = hdfsPath;
-        oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.actualPath, configuration);
+        oStream.info = hdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.actualPath);
 
-        oStream.suffixedPath = oStream.actualPath + '.' + configuration.getOpenedSuffix();
+        oStream.suffixedPath = oStream.actualPath + '.' + endpointConfig.getOpenedSuffix();
 
         Path actualPath = new Path(oStream.actualPath);
         boolean actualPathExists = oStream.info.getFileSystem().exists(actualPath);
 
-        if (configuration.isWantAppend() || configuration.isAppend()) {
+        if (endpointConfig.isWantAppend() || endpointConfig.isAppend()) {
             if (actualPathExists) {
-                configuration.setAppend(true);
-                oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.suffixedPath, configuration);
+                endpointConfig.setAppend(true);
+                oStream.info = hdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.suffixedPath);
                 oStream.info.getFileSystem().rename(actualPath, new Path(oStream.suffixedPath));
             } else {
-                configuration.setAppend(false);
+                endpointConfig.setAppend(false);
             }
         } else if (actualPathExists && !oStream.info.getFileSystem().isDirectory(actualPath)) { // only check if not directory
-            if (configuration.isOverwrite()) {
+            if (endpointConfig.isOverwrite()) {
                 oStream.info.getFileSystem().delete(actualPath, true);
             } else {
-                throw new RuntimeCamelException("The file already exists");
+                throw new RuntimeCamelException("File [" + actualPath + "] already exists");
             }
         }
 
-        oStream.out = oStream.fileType.createOutputStream(oStream.suffixedPath, configuration);
+        oStream.out = oStream.fileType.createOutputStream(oStream.suffixedPath, hdfsInfoFactory);
         oStream.opened = true;
         return oStream;
     }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 048a120..8f407d8 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -139,7 +139,8 @@ public class HdfsProducer extends DefaultProducer {
             log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
         }
 
-        HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+        HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(config);
+        HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), hdfsInfoFactory);
 
         if (onStartup) {
             log.info("Connected to hdfs file-system {}", hdfsFsDescription);
@@ -187,13 +188,14 @@ public class HdfsProducer extends DefaultProducer {
         Object body = exchange.getIn().getBody();
         Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
 
+        HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(config);
         // if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath
         if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) {
             if (oStream != null) {
                 IOHelper.close(oStream, "output stream", log);
             }
             StringBuilder actualPath = getHdfsPathUsingFileNameHeader(exchange);
-            oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+            oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), hdfsInfoFactory);
         } else if (oStream == null) {
             // must have oStream
             oStream = setupHdfs(false);
@@ -204,7 +206,7 @@ public class HdfsProducer extends DefaultProducer {
                 IOHelper.close(oStream, "output stream", log);
             }
             StringBuilder actualPath = newFileName();
-            oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+            oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), hdfsInfoFactory);
         }
 
         String path = oStream.getActualPath();
@@ -215,7 +217,7 @@ public class HdfsProducer extends DefaultProducer {
 
         // close if we do not have idle checker task to do this for us
         boolean close = scheduler == null;
-        // but user may have a header to explict control the close
+        // but user may have a header to explicit control the close
         Boolean closeHeader = exchange.getIn().getHeader(HdfsConstants.HDFS_CLOSE, Boolean.class);
         if (closeHeader != null) {
             close = closeHeader;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
similarity index 75%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index 159183a..936a2a8 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -26,16 +26,16 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsSequenceFileType extends DefaultHdfsFileType {
+class HdfsSequenceFileHandler extends DefaultHdfsFile {
 
     @Override
-    public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) {
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
         try {
             Holder<Integer> keySize = new Holder<>();
             Writable keyWritable = getWritable(key, typeConverter, keySize);
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, typeConverter, valueSize);
-            SequenceFile.Writer writer = (SequenceFile.Writer) hdfsostr.getOut();
+            SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
             writer.append(keyWritable, valueWritable);
             writer.sync();
             return Long.sum(keySize.value, valueSize.value);
@@ -65,16 +65,17 @@ class HdfsSequenceFileType extends DefaultHdfsFileType {
     }
 
     @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rout;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            Class<?> keyWritableClass = configuration.getKeyType().getWritableClass();
-            Class<?> valueWritableClass = configuration.getValueType().getWritableClass();
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+            Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
             rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
-                    SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(configuration.getBufferSize()),
-                    SequenceFile.Writer.replication(configuration.getReplication()), SequenceFile.Writer.blockSize(configuration.getBlockSize()),
-                    SequenceFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
+                    SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+                    SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+                    SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
                     SequenceFile.Writer.progressable(() -> {
                     }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
             return rout;
@@ -84,10 +85,10 @@ class HdfsSequenceFileType extends DefaultHdfsFileType {
     }
 
     @Override
-    public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
+    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
             Closeable rin;
-            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
             return rin;
         } catch (IOException ex) {
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index f0ec9de..cc45053 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -16,566 +16,146 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.Exchange;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayFile;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.Progressable;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.hadoop.io.SequenceFile.CompressionType;
-import static org.hamcrest.CoreMatchers.equalTo;
-
-public class HdfsConsumerTest extends HdfsTestSupport {
-    private static final int ITERATIONS = 200;
-
-    @Override
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
+import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_OPENED_SUFFIX;
+import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_READ_SUFFIX;
+import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HdfsConsumerTest {
+
+    private HdfsEndpoint endpoint;
+    private Processor processor;
+    private HdfsConfiguration endpointConfig;
+    private HdfsInfoFactory hdfsInfoFactory;
+    private CamelContext context;
+    private FileSystem fileSystem;
+    private Configuration configuration;
+
+    private HdfsConsumer underTest;
 
-    @Override
     @Before
     public void setUp() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        // must be able to get security configuration
-        try {
-            javax.security.auth.login.Configuration.getConfiguration();
-        } catch (Exception e) {
-            return;
-        }
-
-        deleteDirectory("target/test");
-        super.setUp();
-    }
-
-    @Test
-    public void testSimpleConsumer() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataOutputStream out = fs.create(file);
-        for (int i = 0; i < 1024; ++i) {
-            out.write(("PIPPO" + i).getBytes("UTF-8"));
-            out.flush();
-        }
-        out.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(2);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testConcurrentConsumers() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final File rootdir = CWD;
-        final File dir = new File("target/test/multiple-consumers");
-        dir.mkdirs();
-        for (int i = 1; i <= ITERATIONS; i++) {
-            FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
-            fos.write(String.format("hello (%04d)\n", i).getBytes());
-            fos.close();
-        }
-
-        final Set<String> fileNames = new HashSet<>();
-        final CountDownLatch latch = new CountDownLatch(ITERATIONS);
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.whenAnyExchangeReceived(new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
-                latch.countDown();
-            }
-        });
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs://" + rootdir.toURI() + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0").to("mock:result");
-                from("hdfs://" + rootdir.toURI() + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0").to("mock:result");
-                from("hdfs://" + rootdir.toURI() + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0").to("mock:result");
-                from("hdfs://" + rootdir.toURI() + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.expectedMessageCount(ITERATIONS);
-
-        latch.await(30, TimeUnit.SECONDS);
-
-        resultEndpoint.assertIsSatisfied();
-        assertThat(fileNames.size(), equalTo(ITERATIONS));
-    }
-
-    @Test
-    public void testSimpleConsumerWithEmptyFile() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataOutputStream out = fs.create(file);
-        out.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        // TODO: See comment from Claus at ticket: https://issues.apache.org/jira/browse/CAMEL-8434
-        resultEndpoint.expectedMinimumMessageCount(1);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        Thread.sleep(2000);
-
-        resultEndpoint.assertIsSatisfied();
-        assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0));
-    }
-
-    @Test
-    public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataOutputStream out = fs.create(file);
-        // size = 5 times chunk size = 210 bytes
-        for (int i = 0; i < 42; ++i) {
-            out.write(new byte[] {0x61, 0x62, 0x63, 0x64, 0x65});
-            out.flush();
-        }
-        out.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(5);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-        assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(42));
-    }
-
-    @Test
-    public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
-        writer.sync();
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(0);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testReadWithReadSuffix() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        String[] beforeFiles = new File("target/test").list();
-        int before = beforeFiles != null ? beforeFiles.length : 0;
-
-        final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        BooleanWritable valueWritable = new BooleanWritable();
-        valueWritable.set(true);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.getParent().toUri() + "?scheduler=#myScheduler&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled")
-                    .to("mock:result");
-            }
-        });
-        ScheduledExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(null, "unitTestPool", 1);
-        DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(pool);
-        context.getRegistry().bind("myScheduler", scheduler);
-        context.start();
+        endpoint = mock(HdfsEndpoint.class);
+        processor = mock(Processor.class);
+        endpointConfig = mock(HdfsConfiguration.class);
+        hdfsInfoFactory = mock(HdfsInfoFactory.class);
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-        resultEndpoint.assertIsSatisfied();
+        HdfsInfo hdfsInfo = mock(HdfsInfo.class);
+        fileSystem = mock(FileSystem.class);
+        configuration = mock(Configuration.class);
+        Path path = mock(Path.class);
 
-        // synchronize on pool that was used to run hdfs consumer thread
-        scheduler.getScheduledExecutorService().shutdown();
-        scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS);
+        when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
+        when(hdfsInfoFactory.getEndpointConfig()).thenReturn(endpointConfig);
+        when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
 
-        Set<String> files = new HashSet<>(Arrays.asList(new File("target/test").list()));
-        // there may be some leftover files before, so test that we only added 2 new files
-        assertThat(files.size() - before, equalTo(2));
-        assertTrue(files.remove("test-camel-boolean.handled"));
-        assertTrue(files.remove(".test-camel-boolean.handled.crc"));
-    }
-
-    @Test
-    public void testReadBoolean() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        BooleanWritable valueWritable = new BooleanWritable();
-        valueWritable.set(true);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testReadByte() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-byte").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, ByteWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        ByteWritable valueWritable = new ByteWritable();
-        byte value = 3;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-        resultEndpoint.message(0).body(byte.class).isEqualTo(3);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testReadFloat() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-float").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, FloatWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        FloatWritable valueWritable = new FloatWritable();
-        float value = 3.1415926535f;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testReadDouble() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-double").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, DoubleWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        DoubleWritable valueWritable = new DoubleWritable();
-        double value = 3.1415926535;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
+        when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
+        when(hdfsInfo.getConfiguration()).thenReturn(configuration);
+        when(hdfsInfo.getPath()).thenReturn(path);
 
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
+        when(endpointConfig.getReadSuffix()).thenReturn(DEFAULT_READ_SUFFIX);
+        when(endpointConfig.getOpenedSuffix()).thenReturn(DEFAULT_OPENED_SUFFIX);
 
-        resultEndpoint.assertIsSatisfied();
+        context = new DefaultCamelContext();
     }
 
     @Test
-    public void testReadInt() throws Exception {
-        if (skipTest()) {
-            return;
-        }
+    public void doStartWithoutHdfsSetup() throws Exception {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+        when(endpointConfig.getPath()).thenReturn(hdfsPath);
+        when(endpointConfig.isConnectOnStartup()).thenReturn(false);
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
 
-        final Path file = new Path(new File("target/test/test-camel-int").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, IntWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        IntWritable valueWritable = new IntWritable();
-        int value = 314159265;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
+        // when
+        underTest.doStart();
 
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
+        // then
+        verify(hdfsInfoFactory, times(0)).newHdfsInfo(anyString());
 
-        resultEndpoint.assertIsSatisfied();
     }
 
     @Test
-    public void testReadLong() throws Exception {
-        if (skipTest()) {
-            return;
-        }
+    public void doStartWithHdfsSetup() throws Exception {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+        when(endpointConfig.getPath()).thenReturn(hdfsPath);
+        when(endpointConfig.isConnectOnStartup()).thenReturn(true);
+        when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL");
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
 
-        final Path file = new Path(new File("target/test/test-camel-long").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, LongWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        LongWritable valueWritable = new LongWritable();
-        long value = 31415926535L;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
+        // when
+        underTest.doStart();
 
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
+        // then
+        verify(hdfsInfoFactory, times(1)).newHdfsInfo(hdfsPath);
 
-        resultEndpoint.assertIsSatisfied();
     }
 
     @Test
-    public void testReadBytes() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-bytes").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BytesWritable.class);
-        NullWritable keyWritable = NullWritable.get();
-        BytesWritable valueWritable = new BytesWritable();
-        String value = "CIAO!";
-        valueWritable.set(value.getBytes(), 0, value.getBytes().length);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testReadString() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, Text.class);
-        NullWritable keyWritable = NullWritable.get();
-        Text valueWritable = new Text();
-        String value = "CIAO!";
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Test
-    public void testReadStringArrayFile() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs1 = FileSystem.get(file.toUri(), conf);
-        ArrayFile.Writer writer = new ArrayFile.Writer(conf, fs1, "target/test/test-camel-string1", Text.class, CompressionType.NONE, new Progressable() {
-            @Override
-            public void progress() {
-            }
-        });
-        Text valueWritable = new Text();
-        String value = "CIAO!";
-        valueWritable.set(value);
-        writer.append(valueWritable);
-        writer.close();
-
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-
-        context.addRoutes(new RouteBuilder() {
-            public void configure() {
-                from("hdfs:localhost/" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0").to("mock:result");
-            }
-        });
-        context.start();
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        if (skipTest()) {
-            return;
-        }
-
-        super.tearDown();
-        Thread.sleep(100);
-        Configuration conf = new Configuration();
-        Path dir = new Path("target/test");
-        FileSystem fs = FileSystem.get(dir.toUri(), conf);
-        fs.delete(dir, true);
-    }
-
-    private Writer createWriter(Configuration conf, Path file, Class<?> keyClass,
-        Class<?> valueClass) throws IOException {
-        return SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
-                SequenceFile.Writer.keyClass(keyClass), SequenceFile.Writer.valueClass(valueClass));
-    }
-
-}
+    public void doPollFromExistingLocalFile() throws Exception {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.getPath()).thenReturn(hdfsPath);
+        when(endpointConfig.getOwner()).thenReturn("spiderman");
+        when(endpointConfig.isConnectOnStartup()).thenReturn(true);
+        when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL");
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(endpoint.createExchange()).thenReturn(new DefaultExchange(context));
+        when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
+
+        when(fileSystem.isFile(any(Path.class))).thenReturn(true);
+
+        FileStatus[] fileStatuses = new FileStatus[1];
+        FileStatus fileStatus = mock(FileStatus.class);
+        fileStatuses[0] = fileStatus;
+        when(fileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+        when(fileStatus.getPath()).thenReturn(new Path(hdfsPath));
+        when(fileStatus.isFile()).thenReturn(true);
+        when(fileStatus.isDirectory()).thenReturn(false);
+        when(fileStatus.getOwner()).thenReturn("spiderman");
+
+        String normalFile = CWD.getAbsolutePath() + "/src/test/resources/hdfs/normal_file.txt";
+        FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockDataInputStream(normalFile));
+        when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true);
+        when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
+
+        underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
+
+        // when
+        int actual = underTest.doPoll();
+
+        // then
+        assertThat(actual, is(1));
+    }
+
+}
\ No newline at end of file
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
index eff5fa9..0405b6c 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
@@ -35,7 +35,7 @@ public class HdfsInfoTest {
         HdfsConfiguration endpointConfig = mock(HdfsConfiguration.class);
 
         // when
-        underTest = HdfsInfoFactory.newHdfsInfoWithoutAuth(hdfsPath, endpointConfig);
+        underTest = new HdfsInfoFactory(endpointConfig).newHdfsInfoWithoutAuth(hdfsPath);
 
         // then
         assertThat(underTest, notNullValue());
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
new file mode 100644
index 0000000..ce6b334
--- /dev/null
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HdfsInputStreamTest {
+
+    private HdfsInfoFactory hdfsInfoFactory;
+    private HdfsConfiguration endpointConfig;
+    private FileSystem fileSystem;
+    private Configuration configuration;
+
+    private HdfsInputStream underTest;
+
+    @Before
+    public void setUp() throws Exception {
+        hdfsInfoFactory = mock(HdfsInfoFactory.class);
+        HdfsInfo hdfsInfo = mock(HdfsInfo.class);
+        endpointConfig = mock(HdfsConfiguration.class);
+
+        fileSystem = mock(FileSystem.class);
+        configuration = mock(Configuration.class);
+        Path path = mock(Path.class);
+
+        when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
+        when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo);
+        when(hdfsInfoFactory.getEndpointConfig()).thenReturn(endpointConfig);
+
+        when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
+        when(hdfsInfo.getConfiguration()).thenReturn(configuration);
+        when(hdfsInfo.getPath()).thenReturn(path);
+    }
+
+    @Test
+    public void createInputStreamForLocalNormalFile() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        FSDataInputStream fsDataInputStream = mock(FSDataInputStream.class);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+
+        when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true);
+        when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
+
+        ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+        // when
+        underTest = HdfsInputStream.createInputStream(hdfsPath, hdfsInfoFactory);
+
+        // then
+        assertThat(underTest, notNullValue());
+        verify(fileSystem, times(1)).rename(any(Path.class), pathCaptor.capture());
+        assertThat(pathCaptor.getValue().toString(), is("hdfs://localhost/target/test/multiple-consumers.null"));
+
+        assertThat(underTest.getNumOfReadBytes(), is(0L));
+        assertThat(underTest.getNumOfReadMessages(), is(0L));
+        assertThat(underTest.getActualPath(), is(hdfsPath));
+        assertThat(underTest.getChunkSize(), is(0));
+        assertThat(underTest.isOpened(), is(true));
+    }
+
+    @Test
+    public void createInputStreamForMissingNormalFile() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        FSDataInputStream fsDataInputStream = mock(FSDataInputStream.class);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+
+        when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(false);
+        when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
+
+        ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+        // when
+        underTest = HdfsInputStream.createInputStream(hdfsPath, hdfsInfoFactory);
+
+        // then
+        assertThat(underTest, nullValue());
+        verify(fileSystem, times(1)).rename(any(Path.class), any(Path.class));
+    }
+
+}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsOutputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsOutputStreamTest.java
new file mode 100644
index 0000000..a736c51
--- /dev/null
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsOutputStreamTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HdfsOutputStreamTest {
+
+    private HdfsInfoFactory hdfsInfoFactory;
+    private HdfsConfiguration endpointConfig;
+    private FileSystem fileSystem;
+
+    private HdfsOutputStream underTest;
+
+    @Before
+    public void setUp() throws Exception {
+        hdfsInfoFactory = mock(HdfsInfoFactory.class);
+        HdfsInfo hdfsInfo = mock(HdfsInfo.class);
+        endpointConfig = mock(HdfsConfiguration.class);
+
+        fileSystem = mock(FileSystem.class);
+        Configuration configuration = mock(Configuration.class);
+        Path path = mock(Path.class);
+
+        when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
+        when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo);
+        when(hdfsInfoFactory.getEndpointConfig()).thenReturn(endpointConfig);
+
+        when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
+        when(hdfsInfo.getConfiguration()).thenReturn(configuration);
+        when(hdfsInfo.getPath()).thenReturn(path);
+    }
+
+    @Test
+    public void createOutputStreamForExistingNormalFileWithAppend() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        FSDataOutputStream fsDataOutputStream = mock(FSDataOutputStream.class);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.isWantAppend()).thenReturn(true);
+        when(endpointConfig.isAppend()).thenReturn(false);
+
+        when(fileSystem.exists(any(Path.class))).thenReturn(true);
+        when(fileSystem.create(any(Path.class), anyBoolean(), anyInt(), anyShort(), anyLong(), any(Progressable.class))).thenReturn(fsDataOutputStream);
+
+        ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+        // when
+        underTest = HdfsOutputStream.createOutputStream(hdfsPath, hdfsInfoFactory);
+
+        // then
+        assertThat(underTest, notNullValue());
+        verify(endpointConfig, times(1)).setAppend(true);
+        verify(fileSystem, times(1)).rename(any(Path.class), pathCaptor.capture());
+        assertThat(pathCaptor.getValue().toString(), is("hdfs://localhost/target/test/multiple-consumers.null"));
+    }
+
+    @Test
+    public void createOutputStreamForMissingNormalFileWithAppend() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        FSDataOutputStream fsDataOutputStream = mock(FSDataOutputStream.class);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.isWantAppend()).thenReturn(true);
+        when(endpointConfig.isAppend()).thenReturn(false);
+
+        when(fileSystem.exists(any(Path.class))).thenReturn(false);
+        when(fileSystem.create(any(Path.class), anyBoolean(), anyInt(), anyShort(), anyLong(), any(Progressable.class))).thenReturn(fsDataOutputStream);
+
+        // when
+        underTest = HdfsOutputStream.createOutputStream(hdfsPath, hdfsInfoFactory);
+
+        // then
+        assertThat(underTest, notNullValue());
+        verify(endpointConfig, times(1)).setAppend(false);
+        verify(fileSystem, times(0)).rename(any(Path.class), any(Path.class));
+    }
+
+    @Test
+    public void createOutputStreamOverwriteExistingNormalFile() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        FSDataOutputStream fsDataOutputStream = mock(FSDataOutputStream.class);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.isWantAppend()).thenReturn(false);
+        when(endpointConfig.isAppend()).thenReturn(false);
+        when(endpointConfig.isOverwrite()).thenReturn(true);
+
+        when(fileSystem.exists(any(Path.class))).thenReturn(true);
+        when(fileSystem.create(any(Path.class), anyBoolean(), anyInt(), anyShort(), anyLong(), any(Progressable.class))).thenReturn(fsDataOutputStream);
+
+        ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+        // when
+        underTest = HdfsOutputStream.createOutputStream(hdfsPath, hdfsInfoFactory);
+
+        // then
+        assertThat(underTest, notNullValue());
+        verify(fileSystem, times(1)).delete(pathCaptor.capture(), eq(true));
+        assertThat(pathCaptor.getValue().toString(), is(hdfsPath));
+
+        assertThat(underTest.getNumOfWrittenBytes(), is(0L));
+        assertThat(underTest.getNumOfWrittenMessages(), is(0L));
+        assertThat(underTest.getActualPath(), is(hdfsPath));
+        assertThat(underTest.getLastAccess() > 0L, is(true));
+        assertThat(underTest.isBusy().get(), is(false));
+    }
+
+    @Test
+    public void createOutputStreamWillFailForExistingNormalFileNoOverwrite() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        FSDataOutputStream fsDataOutputStream = mock(FSDataOutputStream.class);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.isWantAppend()).thenReturn(false);
+        when(endpointConfig.isAppend()).thenReturn(false);
+        when(endpointConfig.isOverwrite()).thenReturn(false);
+
+        when(fileSystem.exists(any(Path.class))).thenReturn(true);
+        when(fileSystem.create(any(Path.class), anyBoolean(), anyInt(), anyShort(), anyLong(), any(Progressable.class))).thenReturn(fsDataOutputStream);
+
+        // when
+        Throwable expected = null;
+        try {
+            underTest = HdfsOutputStream.createOutputStream(hdfsPath, hdfsInfoFactory);
+        } catch (Exception e) {
+            expected = e;
+        }
+
+        // then
+        assertThat(expected, notNullValue());
+        assertThat(expected.getMessage(), is("File [hdfs://localhost/target/test/multiple-consumers] already exists"));
+    }
+
+}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/MockDataInputStream.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/MockDataInputStream.java
new file mode 100644
index 0000000..8bbf449
--- /dev/null
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/MockDataInputStream.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.EOFException;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+public class MockDataInputStream extends FSInputStream implements Seekable, PositionedReadable {
+
+    private final FileInputStream fis;
+    private long position;
+
+    MockDataInputStream(String targetFile) throws FileNotFoundException {
+        this(new FileInputStream(targetFile));
+    }
+
+    MockDataInputStream(FileInputStream fis) {
+        this.fis = fis;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        if (pos < 0) {
+            throw new EOFException(
+                    FSExceptionMessages.NEGATIVE_SEEK);
+        }
+        fis.getChannel().position(pos);
+        this.position = pos;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return this.position;
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+        return false;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return fis.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        fis.close();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        try {
+            int value = fis.read();
+            if (value >= 0) {
+                this.position++;
+            }
+            return value;
+        } catch (IOException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        try {
+            int value = fis.read(b, off, len);
+            if (value > 0) {
+                this.position += value;
+            }
+            return value;
+        } catch (IOException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public int read(long position, byte[] b, int off, int len)
+            throws IOException {
+        ByteBuffer bb = ByteBuffer.wrap(b, off, len);
+        try {
+            return fis.getChannel().read(bb, position);
+        } catch (IOException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        long value = fis.skip(n);
+        if (value > 0) {
+            this.position += value;
+        }
+        return value;
+    }
+
+}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationTest.java
similarity index 99%
copy from components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
copy to components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationTest.java
index f0ec9de..d5a10dc 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.hdfs;
+package org.apache.camel.component.hdfs.integration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.hdfs.HdfsTestSupport;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
 import org.apache.hadoop.conf.Configuration;
@@ -56,7 +57,7 @@ import org.junit.Test;
 import static org.apache.hadoop.io.SequenceFile.CompressionType;
 import static org.hamcrest.CoreMatchers.equalTo;
 
-public class HdfsConsumerTest extends HdfsTestSupport {
+public class HdfsConsumerIntegrationTest extends HdfsTestSupport {
     private static final int ITERATIONS = 200;
 
     @Override
diff --git a/components/camel-hdfs/src/test/resources/hdfs/normal_file.txt b/components/camel-hdfs/src/test/resources/hdfs/normal_file.txt
new file mode 100644
index 0000000..6cedf63
--- /dev/null
+++ b/components/camel-hdfs/src/test/resources/hdfs/normal_file.txt
@@ -0,0 +1,28 @@
+Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam eget fermentum arcu, vel dignissim ipsum.
+Nam nisl magna, euismod quis fringilla ut, euismod sed lectus. Etiam commodo tincidunt libero, sit amet sagittis nunc placerat vitae.
+Donec quis odio eget nibh lobortis fermentum. Aenean a mattis odio. Proin faucibus volutpat volutpat. Sed ut tempor lacus, sit amet faucibus quam.
+Curabitur risus magna, placerat ac lectus vitae, sollicitudin porttitor nibh.
+
+Aliquam eros eros, ornare sed commodo ac, ullamcorper eu ante. Vestibulum vel orci eu leo aliquet ultricies.
+Nam vitae mi tellus. Vivamus tempor nec arcu a vulputate. Aliquam ut mollis lacus, id vestibulum elit. Donec sit amet nunc laoreet,
+feugiat sem a, volutpat eros. Sed id leo a sapien ultricies euismod.
+
+Donec quis justo diam. Etiam enim nulla, varius id mi et, auctor dictum velit. Interdum et malesuada fames ac
+ante ipsum primis in faucibus. Sed accumsan placerat justo sit amet pellentesque. Fusce id rhoncus dui, imperdiet malesuada felis.
+Morbi ornare nisl sit amet lacus fringilla suscipit. Vestibulum maximus elit id dui hendrerit fermentum.
+Mauris varius, velit eget posuere lacinia, massa urna convallis neque, tristique pharetra lorem dolor et lacus.
+Integer volutpat mi dolor. Praesent sit amet est pretium ipsum vulputate aliquet. Aliquam eu diam ut felis aliquam vulputate.
+
+Nulla faucibus vulputate lobortis. Quisque blandit tellus et ligula vulputate consequat. In velit metus, lobortis ac
+accumsan non, sollicitudin at enim. Nam commodo tellus ex, vel tempor lectus luctus id. Nam nec sapien vel eros luctus
+convallis sit amet sed velit. Cras porttitor sapien sapien, ut placerat ex commodo nec. Fusce fermentum sed tortor a hendrerit.
+Nullam aliquet urna tellus, vel luctus sem convallis a. Cras placerat quam mauris, eget rhoncus velit venenatis ut. Aliquam semper
+arcu sit amet leo tempor, in imperdiet nulla vestibulum. Vivamus eget tempus ante. Donec vel massa et quam rhoncus laoreet. Integer
+pellentesque neque lacus, a tempor risus ultricies rhoncus.
+
+Integer nec justo varius eros condimentum porta. Sed neque ipsum, sagittis quis aliquam in, eleifend at massa. Sed id nisl accumsan,
+pretium erat quis, auctor nisl. Vestibulum lacinia euismod ultricies. Nulla aliquam turpis nunc, vitae pulvinar
+felis luctus tempor. Integer aliquam massa et varius aliquam. In non lacus sagittis est pretium semper sit amet id ipsum.
+Pellentesque euismod interdum scelerisque. Pellentesque ac pulvinar nunc, id gravida eros. Nulla posuere nisl tempus,
+ultricies dolor eu, dictum erat. Nunc a semper odio, at egestas felis. Proin dictum sollicitudin bibendum. Maecenas nec
+pretium arcu, id lobortis est.
\ No newline at end of file
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
index c3aeb51..907e582 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
@@ -316,6 +316,35 @@ public interface HdfsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Sets the download method to use when not using a local working
+         * directory. If set to true, the remote files are streamed to the route
+         * as they are read. When set to false, the remote files are loaded into
+         * memory before being sent into the route.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default HdfsEndpointConsumerBuilder streamDownload(
+                boolean streamDownload) {
+            doSetProperty("streamDownload", streamDownload);
+            return this;
+        }
+        /**
+         * Sets the download method to use when not using a local working
+         * directory. If set to true, the remote files are streamed to the route
+         * as they are read. When set to false, the remote files are loaded into
+         * memory before being sent into the route.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Group: consumer
+         */
+        default HdfsEndpointConsumerBuilder streamDownload(String streamDownload) {
+            doSetProperty("streamDownload", streamDownload);
+            return this;
+        }
+        /**
          * The number of subsequent error polls (failed due some error) that
          * should happen before the backoffMultipler should kick-in.
          *