You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/04 07:49:03 UTC

[camel] branch master updated: CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new e4e7147  CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310)
e4e7147 is described below

commit e4e71473d62e378adce63d6a76a891f379b6affe
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Mon Nov 4 08:48:51 2019 +0100

    CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310)
    
    * hdfs stream
    
    * hdfs stream
    
    * Migrate towards a more hdfs object based design
    
    * Migrate towards a more hdfs object based design
    
    * CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile
---
 .../camel/component/hdfs/DefaultHdfsFile.java      |  9 +--
 .../component/hdfs/HdfsArrayFileTypeHandler.java   | 63 +++++++++--------
 .../component/hdfs/HdfsBloommapFileHandler.java    | 74 ++++++++++----------
 .../apache/camel/component/hdfs/HdfsConsumer.java  | 12 +---
 .../org/apache/camel/component/hdfs/HdfsFile.java  | 12 ++--
 .../apache/camel/component/hdfs/HdfsFileType.java  |  6 +-
 .../camel/component/hdfs/HdfsInputStream.java      | 18 ++---
 .../camel/component/hdfs/HdfsMapFileHandler.java   | 72 ++++++++++----------
 .../component/hdfs/HdfsNormalFileHandler.java      | 76 +++++++++++++--------
 .../camel/component/hdfs/HdfsOutputStream.java     |  6 +-
 .../apache/camel/component/hdfs/HdfsProducer.java  |  5 +-
 .../component/hdfs/HdfsSequenceFileHandler.java    | 78 +++++++++++-----------
 .../camel/component/hdfs/HdfsConsumerTest.java     | 68 +++++++++++++++++++
 13 files changed, 293 insertions(+), 206 deletions(-)

diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index 76942c8..cc0845e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hdfs;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -24,7 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.camel.TypeConverter;
+import org.apache.camel.Exchange;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
@@ -37,7 +38,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
-abstract class DefaultHdfsFile implements HdfsFile {
+abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> implements HdfsFile<T, U, Object, Object> {
 
     protected final long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
         long numBytes = 0;
@@ -61,13 +62,13 @@ abstract class DefaultHdfsFile implements HdfsFile {
         return numBytes;
     }
 
-    protected final Writable getWritable(Object obj, TypeConverter typeConverter, Holder<Integer> size) {
+    protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) {
         Class<?> objCls = obj == null ? null : obj.getClass();
         HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
         if (objWritableFactory == null) {
             objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
         }
-        return objWritableFactory.create(obj, typeConverter, size);
+        return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size);
     }
 
     protected final Object getObject(Writable writable, Holder<Integer> size) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 35bb5a9..2a0e68c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -16,72 +16,71 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.ArrayFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsArrayFileTypeHandler extends DefaultHdfsFile {
+class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFile.Reader> {
 
+    @SuppressWarnings("rawtypes")
     @Override
-    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+    public ArrayFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
-            Holder<Integer> valueSize = new Holder<>();
-            Writable valueWritable = getWritable(value, typeConverter, valueSize);
-            ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
-            return valueSize.value;
-        } catch (Exception ex) {
+            ArrayFile.Writer rout;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+            rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+                    endpointConfig.getCompressionType(), () -> { });
+            return rout;
+        } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         }
     }
 
     @Override
-    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
         try {
-            ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn();
             Holder<Integer> valueSize = new Holder<>();
-            Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
-            if (reader.next(valueWritable) != null) {
-                value.value = getObject(valueWritable, valueSize);
-                return valueSize.value;
-            } else {
-                return 0;
-            }
+            Writable valueWritable = getWritable(value, exchange, valueSize);
+            ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
+            return valueSize.value;
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+    public ArrayFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
-            Closeable rout;
+            ArrayFile.Reader rin;
             HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
-            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
-                    endpointConfig.getCompressionType(), () -> { });
-            return rout;
+            rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
+            return rin;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         }
     }
 
     @Override
-    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         try {
-            Closeable rin;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
-            return rin;
-        } catch (IOException ex) {
+            ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn();
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+            if (reader.next(valueWritable) != null) {
+                value.value = getObject(valueWritable, valueSize);
+                return valueSize.value;
+            } else {
+                return 0;
+            }
+        } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
     }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
index 422e4e6..0b1f907 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
@@ -16,11 +16,10 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BloomMapFile;
@@ -29,15 +28,35 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsBloommapFileHandler extends DefaultHdfsFile {
+class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public BloomMapFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            BloomMapFile.Writer rout;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+            rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+                    MapFile.Writer.valueClass(valueWritableClass),
+                    MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
+                    MapFile.Writer.progressable(() -> {
+                    }));
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
 
     @Override
-    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
         try {
             Holder<Integer> keySize = new Holder<>();
-            Writable keyWritable = getWritable(key, typeConverter, keySize);
+            Writable keyWritable = getWritable(key, exchange, keySize);
             Holder<Integer> valueSize = new Holder<>();
-            Writable valueWritable = getWritable(value, typeConverter, valueSize);
+            Writable valueWritable = getWritable(value, exchange, valueSize);
             ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
             return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
@@ -46,6 +65,18 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile {
     }
 
     @Override
+    public BloomMapFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            BloomMapFile.Reader rin;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
     public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
         try {
             MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn();
@@ -65,35 +96,4 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile {
         }
     }
 
-    @SuppressWarnings("rawtypes")
-    @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
-        try {
-            Closeable rout;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
-            Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
-            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
-                    MapFile.Writer.valueClass(valueWritableClass),
-                    MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> {
-                    }));
-            return rout;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
-
-    @Override
-    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
-        try {
-            Closeable rin;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
-            return rin;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 7b3f4e4..4998d7a 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -29,6 +29,7 @@ import javax.security.auth.login.Configuration;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.support.ScheduledPollConsumer;
 import org.apache.camel.util.IOHelper;
 import org.apache.commons.lang.StringUtils;
@@ -148,15 +149,8 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         Holder<Object> key = new Holder<>();
         Holder<Object> value = new Holder<>();
 
-        if (this.endpointConfig.isStreamDownload()) {
-            key.value = null;
-            value.value = inputStream;
-            // use the input stream as the body
+        while (inputStream.next(key, value) >= 0) {
             processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
-        } else {
-            while (inputStream.next(key, value) >= 0) {
-                processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
-            }
         }
     }
 
@@ -201,7 +195,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
                     return false;
                 }
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new RuntimeCamelException(e);
             }
         }
         return true;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
index 1e2d63f..9979adf 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
@@ -18,16 +18,16 @@ package org.apache.camel.component.hdfs;
 
 import java.io.Closeable;
 
-import org.apache.camel.TypeConverter;
+import org.apache.camel.Exchange;
 
-interface HdfsFile {
+interface HdfsFile<T extends Closeable, U extends Closeable, K, V> {
 
-    Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
+    T createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
 
-    long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter);
+    long append(HdfsOutputStream hdfsOutputStream, K key, V value, Exchange exchange);
 
-    Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
+    U createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
 
-    long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value);
+    long next(HdfsInputStream hdfsInputStream, Holder<K> key, Holder<V> value);
 
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 7d4a239..430e20c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.hdfs;
 
 import java.io.Closeable;
 
-import org.apache.camel.TypeConverter;
+import org.apache.camel.Exchange;
 
 public enum HdfsFileType {
 
@@ -38,8 +38,8 @@ public enum HdfsFileType {
         return this.file.createOutputStream(hdfsPath, hdfsInfoFactory);
     }
 
-    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
-        return this.file.append(hdfsOutputStream, key, value, typeConverter);
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
+        return this.file.append(hdfsOutputStream, key, value, exchange);
     }
 
     public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 8f12aef..68c22f6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.camel.RuntimeCamelException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
@@ -30,6 +31,7 @@ public class HdfsInputStream implements Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(HdfsInputStream.class);
 
     private HdfsFileType fileType;
+    private HdfsInfo info;
     private String actualPath;
     private String suffixedPath;
     private String suffixedReadPath;
@@ -39,7 +41,7 @@ public class HdfsInputStream implements Closeable {
     private final AtomicLong numOfReadBytes = new AtomicLong(0L);
     private final AtomicLong numOfReadMessages = new AtomicLong(0L);
 
-    private HdfsConfiguration config;
+    private boolean streamDownload;
 
     protected HdfsInputStream() {
     }
@@ -49,7 +51,6 @@ public class HdfsInputStream implements Closeable {
      * @param hdfsPath
      * @param hdfsInfoFactory
      * @return
-     * @throws IOException
      */
     public static HdfsInputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
@@ -59,18 +60,18 @@ public class HdfsInputStream implements Closeable {
         iStream.suffixedPath = iStream.actualPath + '.' + endpointConfig.getOpenedSuffix();
         iStream.suffixedReadPath = iStream.actualPath + '.' + endpointConfig.getReadSuffix();
         iStream.chunkSize = endpointConfig.getChunkSize();
+        iStream.streamDownload = endpointConfig.isStreamDownload();
         try {
-            HdfsInfo info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath);
-            if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
+            iStream.info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath);
+            if (iStream.info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
                 iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, hdfsInfoFactory);
                 iStream.opened = true;
-                iStream.config = endpointConfig;
             } else {
                 LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
                 iStream = null;
             }
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new RuntimeCamelException(e);
         }
 
         return iStream;
@@ -80,8 +81,6 @@ public class HdfsInputStream implements Closeable {
     public final void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(in);
-            HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(config);
-            HdfsInfo info = hdfsInfoFactory.newHdfsInfo(actualPath);
             info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
             opened = false;
         }
@@ -133,4 +132,7 @@ public class HdfsInputStream implements Closeable {
         return opened;
     }
 
+    public boolean isStreamDownload() {
+        return streamDownload;
+    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index 1c19162..b63463d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -16,11 +16,10 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
@@ -28,15 +27,34 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsMapFileHandler extends DefaultHdfsFile {
+class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> {
 
     @Override
-    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+    @SuppressWarnings("rawtypes")
+    public MapFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            MapFile.Writer rout;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+            rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+                    MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
+                    MapFile.Writer.progressable(() -> {
+                    }));
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
         try {
             Holder<Integer> keySize = new Holder<>();
-            Writable keyWritable = getWritable(key, typeConverter, keySize);
+            Writable keyWritable = getWritable(key, exchange, keySize);
             Holder<Integer> valueSize = new Holder<>();
-            Writable valueWritable = getWritable(value, typeConverter, valueSize);
+            Writable valueWritable = getWritable(value, exchange, valueSize);
             ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
             return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
@@ -45,6 +63,18 @@ class HdfsMapFileHandler extends DefaultHdfsFile {
     }
 
     @Override
+    public MapFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            MapFile.Reader rin;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
     public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         try {
             MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn();
@@ -64,34 +94,4 @@ class HdfsMapFileHandler extends DefaultHdfsFile {
         }
     }
 
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
-        try {
-            Closeable rout;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
-            Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
-            Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
-                    MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> {
-                    }));
-            return rout;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
-
-    @Override
-    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
-        try {
-            Closeable rin;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
-            return rin;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index 952bde8..e17dd1d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -17,15 +17,15 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,22 +33,24 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 
-class HdfsNormalFileHandler extends DefaultHdfsFile {
+class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
+
+    private boolean consumed;
 
     @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+    public OutputStream createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
-            FSDataOutputStream rout;
+            OutputStream outputStream;
             HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             if (endpointConfig.isAppend()) {
-                rout = hdfsInfo.getFileSystem().append(
+                outputStream = hdfsInfo.getFileSystem().append(
                         hdfsInfo.getPath(),
                         endpointConfig.getBufferSize(),
                     () -> { }
                 );
             } else {
-                rout = hdfsInfo.getFileSystem().create(
+                outputStream = hdfsInfo.getFileSystem().create(
                         hdfsInfo.getPath(),
                         endpointConfig.isOverwrite(),
                         endpointConfig.getBufferSize(),
@@ -57,37 +59,37 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
                     () -> { }
                 );
             }
-            return rout;
+            return outputStream;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         }
     }
 
     @Override
-    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
-        InputStream is = null;
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
+        InputStream inputStream = null;
         try {
-            is = typeConverter.convertTo(InputStream.class, value);
-            return copyBytes(is, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
+            inputStream = exchange.getContext().getTypeConverter().convertTo(InputStream.class, exchange, value);
+            return copyBytes(inputStream, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         } finally {
-            IOHelper.close(is);
+            IOHelper.close(inputStream);
         }
     }
 
     @Override
-    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+    public InputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
         try {
-            Closeable rin;
+            InputStream inputStream;
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             if (endpointConfig.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
                 HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-                rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+                inputStream = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
             } else {
-                rin = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig));
+                inputStream = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig));
             }
-            return rin;
+            return inputStream;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -95,19 +97,40 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
 
     @Override
     public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+        if (hdfsInputStream.isStreamDownload()) {
+            return nextAsWrappedStream(hdfsInputStream, key, value);
+        } else {
+            return nextAsOutputStream(hdfsInputStream, key, value);
+        }
+    }
+
+    private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+        InputStream inputStream = (InputStream) hdfsInputStream.getIn();
+        key.value = null;
+        value.value = inputStream;
+
+        if (consumed) {
+            return 0;
+        } else {
+            consumed = true;
+            return 1;
+        }
+    }
+
+    private long nextAsOutputStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize());
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(hdfsInputStream.getChunkSize());
             byte[] buf = new byte[hdfsInputStream.getChunkSize()];
             int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
             if (bytesRead >= 0) {
-                bos.write(buf, 0, bytesRead);
+                outputStream.write(buf, 0, bytesRead);
                 key.value = null;
-                value.value = bos;
+                value.value = outputStream;
                 return bytesRead;
             } else {
                 key.value = null;
                 // indication that we may have read from empty file
-                value.value = bos;
+                value.value = outputStream;
                 return 0;
             }
         } catch (IOException ex) {
@@ -117,18 +140,17 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
 
     private File getHdfsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
         try {
-            String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
+            String fileName = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
 
             // [CAMEL-13711] Files.createTempFile not equivalent to File.createTempFile
-
             File outputDest;
             try {
                 // First trying: Files.createTempFile
-                outputDest = Files.createTempFile(fname, ".hdfs").toFile();
+                outputDest = Files.createTempFile(fileName, ".hdfs").toFile();
 
             } catch (Exception ex) {
                 // Now trying: File.createTempFile
-                outputDest = File.createTempFile(fname, ".hdfs");
+                outputDest = File.createTempFile(fileName, ".hdfs");
             }
 
             if (outputDest.exists()) {
@@ -150,7 +172,7 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
                 return outputDest;
             }
 
-            return new File(outputDest, fname);
+            return new File(outputDest, fileName);
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
         }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
index b93b93e..c192d8c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 
@@ -84,10 +84,10 @@ public class HdfsOutputStream implements Closeable {
         }
     }
 
-    public void append(Object key, Object value, TypeConverter typeConverter) {
+    public void append(Object key, Object value, Exchange exchange) {
         try {
             busy.set(true);
-            long nb = fileType.append(this, key, value, typeConverter);
+            long nb = fileType.append(this, key, value, exchange);
             numOfWrittenBytes.addAndGet(nb);
             numOfWrittenMessages.incrementAndGet();
             lastAccess.set(System.currentTimeMillis());
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 8f407d8..0877eef 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StringHelper;
@@ -114,7 +115,7 @@ public class HdfsProducer extends DefaultProducer {
         } catch (Exception e) {
             log.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
             log.debug("", e);
-            throw new RuntimeException(e);
+            throw new RuntimeCamelException(e);
         } finally {
             HdfsComponent.setJAASConfiguration(auth);
         }
@@ -211,7 +212,7 @@ public class HdfsProducer extends DefaultProducer {
 
         String path = oStream.getActualPath();
         log.trace("Writing body to hdfs-file {}", path);
-        oStream.append(key, body, exchange.getContext().getTypeConverter());
+        oStream.append(key, body, exchange);
 
         idle.set(false);
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index 936a2a8..d755ebc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -16,25 +16,44 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsSequenceFileHandler extends DefaultHdfsFile {
+class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, SequenceFile.Reader> {
 
     @Override
-    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+    public SequenceFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            SequenceFile.Writer rout;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+            Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+            Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+            rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
+                    SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+                    SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+                    SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
+                    SequenceFile.Writer.progressable(() -> {
+                    }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
         try {
             Holder<Integer> keySize = new Holder<>();
-            Writable keyWritable = getWritable(key, typeConverter, keySize);
+            Writable keyWritable = getWritable(key, exchange, keySize);
             Holder<Integer> valueSize = new Holder<>();
-            Writable valueWritable = getWritable(value, typeConverter, valueSize);
+            Writable valueWritable = getWritable(value, exchange, valueSize);
             SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
             writer.append(keyWritable, valueWritable);
             writer.sync();
@@ -45,9 +64,21 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile {
     }
 
     @Override
-    public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+    public SequenceFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+        try {
+            SequenceFile.Reader rin;
+            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+            rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         try {
-            SequenceFile.Reader reader = (SequenceFile.Reader) hdfsistr.getIn();
+            SequenceFile.Reader reader = (SequenceFile.Reader) hdfsInputStream.getIn();
             Holder<Integer> keySize = new Holder<>();
             Writable keyWritable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
             Holder<Integer> valueSize = new Holder<>();
@@ -64,35 +95,4 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile {
         }
     }
 
-    @Override
-    public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
-        try {
-            Closeable rout;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
-            Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
-            Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
-                    SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
-                    SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
-                    SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    SequenceFile.Writer.progressable(() -> {
-                    }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
-            return rout;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
-
-    @Override
-    public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
-        try {
-            Closeable rin;
-            HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
-            rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
-            return rin;
-        } catch (IOException ex) {
-            throw new RuntimeCamelException(ex);
-        }
-    }
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index cc45053..c744901 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.component.hdfs;
 
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultExchange;
@@ -27,11 +31,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_OPENED_SUFFIX;
 import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_READ_SUFFIX;
 import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
 import static org.junit.Assert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -129,6 +136,7 @@ public class HdfsConsumerTest {
         when(endpointConfig.getOwner()).thenReturn("spiderman");
         when(endpointConfig.isConnectOnStartup()).thenReturn(true);
         when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL");
+        when(endpointConfig.getChunkSize()).thenReturn(100 * 1000);
         when(endpoint.getCamelContext()).thenReturn(context);
         when(endpoint.createExchange()).thenReturn(new DefaultExchange(context));
         when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
@@ -149,6 +157,8 @@ public class HdfsConsumerTest {
         when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true);
         when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
 
+        ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+
         underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
 
         // when
@@ -156,6 +166,64 @@ public class HdfsConsumerTest {
 
         // then
         assertThat(actual, is(1));
+        verify(processor, times(1)).process(exchangeCaptor.capture());
+        Exchange exchange = exchangeCaptor.getValue();
+        assertThat(exchange, notNullValue());
+
+        ByteArrayOutputStream body = exchange.getIn().getBody(ByteArrayOutputStream.class);
+        assertThat(body, notNullValue());
+        assertThat(body.toString(), startsWith("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam eget fermentum arcu, vel dignissim ipsum."));
+
+    }
+
+    @Test
+    public void doPollFromExistingLocalFileWithStreamDownload() throws Exception {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+        when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+        when(endpointConfig.getPath()).thenReturn(hdfsPath);
+        when(endpointConfig.getOwner()).thenReturn("spiderman");
+        when(endpointConfig.isConnectOnStartup()).thenReturn(true);
+        when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL");
+        when(endpointConfig.getChunkSize()).thenReturn(100 * 1000);
+        when(endpointConfig.isStreamDownload()).thenReturn(true);
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(endpoint.createExchange()).thenReturn(new DefaultExchange(context));
+        when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
+
+        when(fileSystem.isFile(any(Path.class))).thenReturn(true);
+
+        FileStatus[] fileStatuses = new FileStatus[1];
+        FileStatus fileStatus = mock(FileStatus.class);
+        fileStatuses[0] = fileStatus;
+        when(fileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+        when(fileStatus.getPath()).thenReturn(new Path(hdfsPath));
+        when(fileStatus.isFile()).thenReturn(true);
+        when(fileStatus.isDirectory()).thenReturn(false);
+        when(fileStatus.getOwner()).thenReturn("spiderman");
+
+        String normalFile = CWD.getAbsolutePath() + "/src/test/resources/hdfs/normal_file.txt";
+        FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockDataInputStream(normalFile));
+        when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true);
+        when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
+
+        ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+
+        underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
+
+        // when
+        int actual = underTest.doPoll();
+
+        // then
+        assertThat(actual, is(1));
+        verify(processor, times(1)).process(exchangeCaptor.capture());
+        Exchange exchange = exchangeCaptor.getValue();
+        assertThat(exchange, notNullValue());
+
+        InputStream body = (InputStream) exchange.getIn().getBody();
+        assertThat(body, notNullValue());
+
     }
 
 }
\ No newline at end of file