You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/22 08:50:46 UTC

[camel] branch master updated: CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b60c2e  CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)
5b60c2e is described below

commit 5b60c2ea3b7b3d49f977c90c26f9f7420c55f595
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Fri Nov 22 09:50:34 2019 +0100

    CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)
    
    CAMEL-14146 - camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter
---
 .../camel/component/hdfs/DefaultHdfsFile.java      | 13 ++--
 .../component/hdfs/HdfsArrayFileTypeHandler.java   | 16 ++--
 ...leHandler.java => HdfsBloomMapFileHandler.java} | 19 +++--
 .../apache/camel/component/hdfs/HdfsConsumer.java  | 86 ++++++++++++++--------
 .../apache/camel/component/hdfs/HdfsFileType.java  |  4 +-
 .../camel/component/hdfs/HdfsInputStream.java      | 69 +++++++++++++++--
 .../camel/component/hdfs/HdfsMapFileHandler.java   | 18 +++--
 .../component/hdfs/HdfsNormalFileHandler.java      |  9 +--
 .../component/hdfs/HdfsSequenceFileHandler.java    | 24 +++---
 .../component/hdfs/HdfsWritableFactories.java      | 44 +++++------
 .../org/apache/camel/component/hdfs/Holder.java    | 10 ++-
 .../camel/component/hdfs/HdfsInputStreamTest.java  |  7 ++
 12 files changed, 217 insertions(+), 102 deletions(-)

diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index cc0845e..04f126d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -24,6 +24,7 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.util.IOHelper;
@@ -63,11 +64,8 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
     }
 
     protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) {
-        Class<?> objCls = obj == null ? null : obj.getClass();
-        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
-        if (objWritableFactory == null) {
-            objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
-        }
+        Class<?> objCls = Optional.ofNullable(obj).orElse(new UnknownType()).getClass();
+        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.getOrDefault(objCls, new HdfsWritableFactories.HdfsObjectWritableFactory());
         return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size);
     }
 
@@ -97,7 +95,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
             writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory());
             writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory());
             writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory());
-            writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory());
+            writables.put(UnknownType.class, new HdfsWritableFactories.HdfsNullWritableFactory());
         }
 
         static {
@@ -112,4 +110,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
             readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory());
         }
     }
+
+    private static final class UnknownType {
+    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 2a0e68c..4db8bd7 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -36,8 +36,14 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
             HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
-                    endpointConfig.getCompressionType(), () -> { });
+            rout = new ArrayFile.Writer(
+                    hdfsInfo.getConfiguration(),
+                    hdfsInfo.getFileSystem(),
+                    hdfsPath,
+                    valueWritableClass,
+                    endpointConfig.getCompressionType(),
+            () -> { }
+            );
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -50,7 +56,7 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, exchange, valueSize);
             ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
-            return valueSize.value;
+            return valueSize.getValue();
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -75,8 +81,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(valueWritable) != null) {
-                value.value = getObject(valueWritable, valueSize);
-                return valueSize.value;
+                value.setValue(getObject(valueWritable, valueSize));
+                return valueSize.getValue();
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
similarity index 86%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
index 0b1f907..44301ae 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
+class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
 
     @SuppressWarnings("rawtypes")
     @Override
@@ -39,11 +39,14 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
             Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+            rout = new BloomMapFile.Writer(
+                    hdfsInfo.getConfiguration(),
+                    new Path(hdfsPath),
+                    MapFile.Writer.keyClass(keyWritableClass),
                     MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> {
-                    }));
+                    MapFile.Writer.progressable(() -> { })
+            );
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -58,7 +61,7 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, exchange, valueSize);
             ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
-            return Long.sum(keySize.value, valueSize.value);
+            return Long.sum(keySize.getValue(), valueSize.getValue());
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -85,9 +88,9 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(keyWritable, valueWritable)) {
-                key.value = getObject(keyWritable, keySize);
-                value.value = getObject(valueWritable, valueSize);
-                return Long.sum(keySize.value, valueSize.value);
+                key.setValue(getObject(keyWritable, keySize));
+                value.setValue(getObject(valueWritable, valueSize));
+                return Long.sum(keySize.getValue(), valueSize.getValue());
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index affad6c..8e105cd 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -18,11 +18,13 @@ package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import javax.security.auth.login.Configuration;
 
@@ -127,54 +129,68 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     }
 
     private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) {
-        final AtomicInteger messageCount = new AtomicInteger(0);
+        final AtomicInteger totalMessageCount = new AtomicInteger(0);
 
-        Arrays.stream(fileStatuses)
+        List<HdfsInputStream> hdfsFiles = Arrays.stream(fileStatuses)
                 .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info))
                 .filter(this::hasMatchingOwner)
                 .limit(endpointConfig.getMaxMessagesPerPoll())
-                .map(this::createInputStream)
+                .map(this::asHdfsFile)
                 .filter(Objects::nonNull)
-                .forEach(hdfsInputStream -> {
-                    try {
-                        processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length);
-                    } finally {
-                        IOHelper.close(hdfsInputStream, "input stream", log);
-                    }
-                });
+                .collect(Collectors.toList());
 
-        return messageCount.get();
+        log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length);
+
+        for (int i = 0; i < hdfsFiles.size(); i++) {
+            HdfsInputStream hdfsFile = hdfsFiles.get(i);
+            try {
+                int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount);
+                log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size());
+                log.debug("File [{}] was split to [{}] messages.", i, messageCount);
+            } finally {
+                IOHelper.close(hdfsFile, "hdfs file", log);
+            }
+        }
+
+        return totalMessageCount.get();
     }
 
-    private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) {
-        Holder<Object> key = new Holder<>();
-        Holder<Object> value = new Holder<>();
+    private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) {
+        final AtomicInteger messageCount = new AtomicInteger(0);
+        Holder<Object> currentKey = new Holder<>();
+        Holder<Object> currentValue = new Holder<>();
 
-        while (inputStream.next(key, value) >= 0) {
-            processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
+        while (hdfsFile.next(currentKey, currentValue) >= 0) {
+            processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount);
+            messageCount.incrementAndGet();
         }
+
+        return messageCount.get();
     }
 
-    private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) {
+    private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, AtomicInteger totalMessageCount) {
         Exchange exchange = this.getEndpoint().createExchange();
         Message message = exchange.getIn();
-        String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
+        String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
         message.setHeader(Exchange.FILE_NAME, fileName);
         message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
-        message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath());
-        if (key.value != null) {
-            message.setHeader(HdfsHeader.KEY.name(), key.value);
+        message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath());
+        if (key.getValue() != null) {
+            message.setHeader(HdfsHeader.KEY.name(), key.getValue());
         }
 
-        if (inputStream.getNumOfReadBytes() >= 0) {
-            message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes());
+        if (hdfsFile.getNumOfReadBytes() >= 0) {
+            message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes());
         }
 
-        message.setBody(value.value);
+        message.setBody(value.getValue());
+
+        updateNewExchange(exchange, messageCount.get(), hdfsFile);
 
-        log.debug("Processing file {}", fileName);
+        log.debug("Processing file [{}]", fileName);
         try {
             processor.process(exchange);
+            totalMessageCount.incrementAndGet();
         } catch (Exception e) {
             exchange.setException(e);
         }
@@ -183,9 +199,6 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         if (exchange.getException() != null) {
             getExceptionHandler().handleException(exchange.getException());
         }
-
-        int count = messageCount.incrementAndGet();
-        log.debug("Processed [{}] files out of [{}]", count, totalFiles);
     }
 
     private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) {
@@ -212,7 +225,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         return true;
     }
 
-    private HdfsInputStream createInputStream(FileStatus fileStatus) {
+    private HdfsInputStream asHdfsFile(FileStatus fileStatus) {
         try {
             this.rwLock.writeLock().lock();
             return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory);
@@ -221,4 +234,19 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         }
     }
 
+    protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) {
+        // do not share unit of work
+        exchange.setUnitOfWork(null);
+
+        exchange.setProperty(Exchange.SPLIT_INDEX, index);
+
+        if (hdfsFile.hasNext()) {
+            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
+        } else {
+            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
+            // streaming mode, so set total size when we are complete based on the index
+            exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
+        }
+    }
+
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 430e20c..8930587 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -25,7 +25,7 @@ public enum HdfsFileType {
     NORMAL_FILE(new HdfsNormalFileHandler()),
     SEQUENCE_FILE(new HdfsSequenceFileHandler()),
     MAP_FILE(new HdfsMapFileHandler()),
-    BLOOMMAP_FILE(new HdfsBloommapFileHandler()),
+    BLOOMMAP_FILE(new HdfsBloomMapFileHandler()),
     ARRAY_FILE(new HdfsArrayFileTypeHandler());
 
     private final HdfsFile file;
@@ -46,7 +46,7 @@ public enum HdfsFileType {
         return this.file.createInputStream(hdfsPath, hdfsInfoFactory);
     }
 
-    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+    public long next(HdfsInputStream hdfsInputStream, final Holder<Object> key, final Holder<Object> value) {
         return this.file.next(hdfsInputStream, key, value);
     }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 68c22f6..8673ac9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.hdfs;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.RuntimeCamelException;
@@ -43,6 +45,8 @@ public class HdfsInputStream implements Closeable {
 
     private boolean streamDownload;
 
+    private EntryHolder cachedNextEntry;
+
     protected HdfsInputStream() {
     }
 
@@ -92,20 +96,42 @@ public class HdfsInputStream implements Closeable {
      * @param value
      * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read
      */
-    public final long next(Holder<Object> key, Holder<Object> value) {
+    public final long next(final Holder<Object> key, final Holder<Object> value) {
+        EntryHolder nextEntry = Optional.ofNullable(cachedNextEntry).orElseGet(() -> getNextFromStream(key, value));
+        cachedNextEntry = null;
+
+        key.setValue(nextEntry.getKey().getValue());
+        value.setValue(nextEntry.getValue().getValue());
+
+        return nextEntry.getByteCount();
+    }
+
+    private EntryHolder getNextFromStream(final Holder<Object> key, final Holder<Object> value) {
         long nb = fileType.next(this, key, value);
         // when zero bytes was read from given type of file, we may still have a record (e.g., empty file)
         // null value.value is the only indication that no (new) record/chunk was read
-        if (nb == 0 && numOfReadMessages.get() > 0) {
+        if ((nb == 0 && numOfReadMessages.get() > 0) || Objects.isNull(value.getValue())) {
             // we've read all chunks from file, which size is exact multiple the chunk size
-            return -1;
-        }
-        if (value.value != null) {
+            nb = -1;
+        } else {
             numOfReadBytes.addAndGet(nb);
             numOfReadMessages.incrementAndGet();
-            return nb;
         }
-        return -1;
+
+        return new EntryHolder(key, value, nb);
+    }
+
+    /**
+     */
+    public final boolean hasNext() {
+        if (Objects.isNull(cachedNextEntry)) {
+            Holder<Object> nextKey = new Holder<>();
+            Holder<Object> nextValue = new Holder<>();
+            long nextByteCount = next(nextKey, nextValue);
+            cachedNextEntry = new EntryHolder(nextKey, nextValue, nextByteCount);
+        }
+
+        return cachedNextEntry.hasNext();
     }
 
     public final long getNumOfReadBytes() {
@@ -135,4 +161,33 @@ public class HdfsInputStream implements Closeable {
     public boolean isStreamDownload() {
         return streamDownload;
     }
+
+    private static class EntryHolder {
+
+        private long byteCount;
+        private Holder<Object> key;
+        private Holder<Object> value;
+
+        public EntryHolder(Holder<Object> key, Holder<Object> value, long byteCount) {
+            this.key = key;
+            this.value = value;
+            this.byteCount = byteCount;
+        }
+
+        public Holder<Object> getKey() {
+            return key;
+        }
+
+        public Holder<Object> getValue() {
+            return value;
+        }
+
+        public Boolean hasNext() {
+            return byteCount >= 0;
+        }
+
+        public long getByteCount() {
+            return byteCount;
+        }
+    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index b63463d..4924ddf 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -38,10 +38,14 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
             Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+            rout = new MapFile.Writer(
+                    hdfsInfo.getConfiguration(),
+                    new Path(hdfsPath),
+                    MapFile.Writer.keyClass(keyWritableClass),
+                    MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> {
-                    }));
+                    MapFile.Writer.progressable(() -> { })
+            );
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -56,7 +60,7 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, exchange, valueSize);
             ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
-            return Long.sum(keySize.value, valueSize.value);
+            return Long.sum(keySize.getValue(), valueSize.getValue());
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -83,9 +87,9 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(keyWritable, valueWritable)) {
-                key.value = getObject(keyWritable, keySize);
-                value.value = getObject(valueWritable, valueSize);
-                return Long.sum(keySize.value, valueSize.value);
+                key.setValue(getObject(keyWritable, keySize));
+                value.setValue(getObject(valueWritable, valueSize));
+                return Long.sum(keySize.getValue(), valueSize.getValue());
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index e17dd1d..084e6d6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -106,8 +106,7 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
 
     private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         InputStream inputStream = (InputStream) hdfsInputStream.getIn();
-        key.value = null;
-        value.value = inputStream;
+        value.setValue(inputStream);
 
         if (consumed) {
             return 0;
@@ -124,13 +123,11 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
             int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
             if (bytesRead >= 0) {
                 outputStream.write(buf, 0, bytesRead);
-                key.value = null;
-                value.value = outputStream;
+                value.setValue(outputStream);
                 return bytesRead;
             } else {
-                key.value = null;
                 // indication that we may have read from empty file
-                value.value = outputStream;
+                value.setValue(outputStream);
                 return 0;
             }
         } catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index d755ebc..3c53dae 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -35,12 +35,18 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
             Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
-                    SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
-                    SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+            rout = SequenceFile.createWriter(
+                    hdfsInfo.getConfiguration(),
+                    SequenceFile.Writer.file(hdfsInfo.getPath()),
+                    SequenceFile.Writer.keyClass(keyWritableClass),
+                    SequenceFile.Writer.valueClass(valueWritableClass),
+                    SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+                    SequenceFile.Writer.replication(endpointConfig.getReplication()),
+                    SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
                     SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    SequenceFile.Writer.progressable(() -> {
-                    }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
+                    SequenceFile.Writer.progressable(() -> { }),
+                    SequenceFile.Writer.metadata(new SequenceFile.Metadata())
+            );
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -57,7 +63,7 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
             SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
             writer.append(keyWritable, valueWritable);
             writer.sync();
-            return Long.sum(keySize.value, valueSize.value);
+            return Long.sum(keySize.getValue(), valueSize.getValue());
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -84,9 +90,9 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(keyWritable, valueWritable)) {
-                key.value = getObject(keyWritable, keySize);
-                value.value = getObject(valueWritable, valueSize);
-                return Long.sum(keySize.value, valueSize.value);
+                key.setValue(getObject(keyWritable, keySize));
+                value.setValue(getObject(valueWritable, valueSize));
+                return Long.sum(keySize.getValue(), valueSize.getValue());
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
index 07c5eb8..7f2de0f 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
@@ -48,13 +48,13 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = 0;
+            size.setValue(0);
             return NullWritable.get();
         }
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = 0;
+            size.setValue(0);
             return null;
         }
     }
@@ -65,7 +65,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             ByteWritable writable = new ByteWritable();
             writable.set(typeConverter.convertTo(Byte.class, value));
             return writable;
@@ -73,7 +73,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             return ((ByteWritable) writable).get();
         }
     }
@@ -84,7 +84,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             BooleanWritable writable = new BooleanWritable();
             writable.set(typeConverter.convertTo(Boolean.class, value));
             return writable;
@@ -92,7 +92,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             return ((BooleanWritable) writable).get();
         }
     }
@@ -104,15 +104,15 @@ public class HdfsWritableFactories {
             BytesWritable writable = new BytesWritable();
             ByteBuffer bb = (ByteBuffer) value;
             writable.set(bb.array(), 0, bb.array().length);
-            size.value = bb.array().length;
+            size.setValue(bb.array().length);
             return writable;
         }
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = ((BytesWritable) writable).getLength();
-            ByteBuffer bb = ByteBuffer.allocate(size.value);
-            bb.put(((BytesWritable) writable).getBytes(), 0, size.value);
+            size.setValue(((BytesWritable) writable).getLength());
+            ByteBuffer bb = ByteBuffer.allocate(size.getValue());
+            bb.put(((BytesWritable) writable).getBytes(), 0, size.getValue());
             return bb;
         }
     }
@@ -123,7 +123,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             DoubleWritable writable = new DoubleWritable();
             writable.set(typeConverter.convertTo(Double.class, value));
             return writable;
@@ -131,7 +131,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             return ((DoubleWritable) writable).get();
         }
     }
@@ -142,7 +142,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             FloatWritable writable = new FloatWritable();
             writable.set(typeConverter.convertTo(Float.class, value));
             return writable;
@@ -150,7 +150,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             return ((FloatWritable) writable).get();
         }
     }
@@ -161,7 +161,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             IntWritable writable = new IntWritable();
             writable.set(typeConverter.convertTo(Integer.class, value));
             return writable;
@@ -169,7 +169,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             return ((IntWritable) writable).get();
         }
     }
@@ -180,7 +180,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             LongWritable writable = new LongWritable();
             writable.set(typeConverter.convertTo(Long.class, value));
             return writable;
@@ -188,7 +188,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = SIZE;
+            size.setValue(SIZE);
             return ((LongWritable) writable).get();
         }
     }
@@ -199,13 +199,13 @@ public class HdfsWritableFactories {
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
             Text writable = new Text();
             writable.set(typeConverter.convertTo(String.class, value));
-            size.value = writable.getBytes().length;
+            size.setValue(writable.getBytes().length);
             return writable;
         }
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = ((Text) writable).getLength();
+            size.setValue(((Text) writable).getLength());
             return writable.toString();
         }
     }
@@ -219,7 +219,7 @@ public class HdfsWritableFactories {
                 IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false);
                 BytesWritable writable = new BytesWritable();
                 writable.set(bos.toByteArray(), 0, bos.toByteArray().length);
-                size.value = bos.toByteArray().length;
+                size.setValue(bos.toByteArray().length);
                 return writable;
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
@@ -228,7 +228,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.value = 0;
+            size.setValue(0);
             return null;
         }
     }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
index 8a02dcd..458f8ea 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
@@ -21,7 +21,7 @@ public final class Holder<T> {
     /**
      * The value contained in the holder.
      **/
-    public T value;
+    private T value;
 
     /**
      * Creates a new holder with a <code>null</code> value.
@@ -37,4 +37,12 @@ public final class Holder<T> {
     public Holder(T value) {
         this.value = value;
     }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
index ce6b334..2841a14 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
@@ -55,6 +56,7 @@ public class HdfsInputStreamTest {
         fileSystem = mock(FileSystem.class);
         configuration = mock(Configuration.class);
         Path path = mock(Path.class);
+        FileStatus fileStatus = mock(FileStatus.class);
 
         when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
         when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo);
@@ -63,6 +65,11 @@ public class HdfsInputStreamTest {
         when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
         when(hdfsInfo.getConfiguration()).thenReturn(configuration);
         when(hdfsInfo.getPath()).thenReturn(path);
+
+        when(path.getFileSystem(configuration)).thenReturn(fileSystem);
+
+        when(fileSystem.getFileStatus(path)).thenReturn(fileStatus);
+        when(fileStatus.getLen()).thenReturn(1000L);
     }
 
     @Test