You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/22 08:51:46 UTC

[camel] branch master updated: Revert "CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)" (#3359)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 1080527  Revert "CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)" (#3359)
1080527 is described below

commit 10805279d9c95a127eff0c09215696421a9bc03e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Nov 22 09:51:36 2019 +0100

    Revert "CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)" (#3359)
    
    This reverts commit 5b60c2ea3b7b3d49f977c90c26f9f7420c55f595.
---
 .../camel/component/hdfs/DefaultHdfsFile.java      | 13 ++--
 .../component/hdfs/HdfsArrayFileTypeHandler.java   | 16 ++--
 ...leHandler.java => HdfsBloommapFileHandler.java} | 19 ++---
 .../apache/camel/component/hdfs/HdfsConsumer.java  | 86 ++++++++--------------
 .../apache/camel/component/hdfs/HdfsFileType.java  |  4 +-
 .../camel/component/hdfs/HdfsInputStream.java      | 69 ++---------------
 .../camel/component/hdfs/HdfsMapFileHandler.java   | 18 ++---
 .../component/hdfs/HdfsNormalFileHandler.java      |  9 ++-
 .../component/hdfs/HdfsSequenceFileHandler.java    | 24 +++---
 .../component/hdfs/HdfsWritableFactories.java      | 44 +++++------
 .../org/apache/camel/component/hdfs/Holder.java    | 10 +--
 .../camel/component/hdfs/HdfsInputStreamTest.java  |  7 --
 12 files changed, 102 insertions(+), 217 deletions(-)

diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index 04f126d..cc0845e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -24,7 +24,6 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.util.IOHelper;
@@ -64,8 +63,11 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
     }
 
     protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) {
-        Class<?> objCls = Optional.ofNullable(obj).orElse(new UnknownType()).getClass();
-        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.getOrDefault(objCls, new HdfsWritableFactories.HdfsObjectWritableFactory());
+        Class<?> objCls = obj == null ? null : obj.getClass();
+        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
+        if (objWritableFactory == null) {
+            objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
+        }
         return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size);
     }
 
@@ -95,7 +97,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
             writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory());
             writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory());
             writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory());
-            writables.put(UnknownType.class, new HdfsWritableFactories.HdfsNullWritableFactory());
+            writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory());
         }
 
         static {
@@ -110,7 +112,4 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
             readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory());
         }
     }
-
-    private static final class UnknownType {
-    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 4db8bd7..2a0e68c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -36,14 +36,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
             HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new ArrayFile.Writer(
-                    hdfsInfo.getConfiguration(),
-                    hdfsInfo.getFileSystem(),
-                    hdfsPath,
-                    valueWritableClass,
-                    endpointConfig.getCompressionType(),
-            () -> { }
-            );
+            rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+                    endpointConfig.getCompressionType(), () -> { });
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -56,7 +50,7 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, exchange, valueSize);
             ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
-            return valueSize.getValue();
+            return valueSize.value;
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -81,8 +75,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(valueWritable) != null) {
-                value.setValue(getObject(valueWritable, valueSize));
-                return valueSize.getValue();
+                value.value = getObject(valueWritable, valueSize);
+                return valueSize.value;
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
similarity index 86%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
index 44301ae..0b1f907 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
+class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
 
     @SuppressWarnings("rawtypes")
     @Override
@@ -39,14 +39,11 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
             Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new BloomMapFile.Writer(
-                    hdfsInfo.getConfiguration(),
-                    new Path(hdfsPath),
-                    MapFile.Writer.keyClass(keyWritableClass),
+            rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
                     MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> { })
-            );
+                    MapFile.Writer.progressable(() -> {
+                    }));
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -61,7 +58,7 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, exchange, valueSize);
             ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
-            return Long.sum(keySize.getValue(), valueSize.getValue());
+            return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -88,9 +85,9 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(keyWritable, valueWritable)) {
-                key.setValue(getObject(keyWritable, keySize));
-                value.setValue(getObject(valueWritable, valueSize));
-                return Long.sum(keySize.getValue(), valueSize.getValue());
+                key.value = getObject(keyWritable, keySize);
+                value.value = getObject(valueWritable, valueSize);
+                return Long.sum(keySize.value, valueSize.value);
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 8e105cd..affad6c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -18,13 +18,11 @@ package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 import javax.security.auth.login.Configuration;
 
@@ -129,68 +127,54 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     }
 
     private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) {
-        final AtomicInteger totalMessageCount = new AtomicInteger(0);
+        final AtomicInteger messageCount = new AtomicInteger(0);
 
-        List<HdfsInputStream> hdfsFiles = Arrays.stream(fileStatuses)
+        Arrays.stream(fileStatuses)
                 .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info))
                 .filter(this::hasMatchingOwner)
                 .limit(endpointConfig.getMaxMessagesPerPoll())
-                .map(this::asHdfsFile)
+                .map(this::createInputStream)
                 .filter(Objects::nonNull)
-                .collect(Collectors.toList());
-
-        log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length);
+                .forEach(hdfsInputStream -> {
+                    try {
+                        processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length);
+                    } finally {
+                        IOHelper.close(hdfsInputStream, "input stream", log);
+                    }
+                });
 
-        for (int i = 0; i < hdfsFiles.size(); i++) {
-            HdfsInputStream hdfsFile = hdfsFiles.get(i);
-            try {
-                int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount);
-                log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size());
-                log.debug("File [{}] was split to [{}] messages.", i, messageCount);
-            } finally {
-                IOHelper.close(hdfsFile, "hdfs file", log);
-            }
-        }
-
-        return totalMessageCount.get();
+        return messageCount.get();
     }
 
-    private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) {
-        final AtomicInteger messageCount = new AtomicInteger(0);
-        Holder<Object> currentKey = new Holder<>();
-        Holder<Object> currentValue = new Holder<>();
+    private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) {
+        Holder<Object> key = new Holder<>();
+        Holder<Object> value = new Holder<>();
 
-        while (hdfsFile.next(currentKey, currentValue) >= 0) {
-            processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount);
-            messageCount.incrementAndGet();
+        while (inputStream.next(key, value) >= 0) {
+            processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
         }
-
-        return messageCount.get();
     }
 
-    private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, AtomicInteger totalMessageCount) {
+    private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) {
         Exchange exchange = this.getEndpoint().createExchange();
         Message message = exchange.getIn();
-        String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
+        String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
         message.setHeader(Exchange.FILE_NAME, fileName);
         message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
-        message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath());
-        if (key.getValue() != null) {
-            message.setHeader(HdfsHeader.KEY.name(), key.getValue());
+        message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath());
+        if (key.value != null) {
+            message.setHeader(HdfsHeader.KEY.name(), key.value);
         }
 
-        if (hdfsFile.getNumOfReadBytes() >= 0) {
-            message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes());
+        if (inputStream.getNumOfReadBytes() >= 0) {
+            message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes());
         }
 
-        message.setBody(value.getValue());
-
-        updateNewExchange(exchange, messageCount.get(), hdfsFile);
+        message.setBody(value.value);
 
-        log.debug("Processing file [{}]", fileName);
+        log.debug("Processing file {}", fileName);
         try {
             processor.process(exchange);
-            totalMessageCount.incrementAndGet();
         } catch (Exception e) {
             exchange.setException(e);
         }
@@ -199,6 +183,9 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         if (exchange.getException() != null) {
             getExceptionHandler().handleException(exchange.getException());
         }
+
+        int count = messageCount.incrementAndGet();
+        log.debug("Processed [{}] files out of [{}]", count, totalFiles);
     }
 
     private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) {
@@ -225,7 +212,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         return true;
     }
 
-    private HdfsInputStream asHdfsFile(FileStatus fileStatus) {
+    private HdfsInputStream createInputStream(FileStatus fileStatus) {
         try {
             this.rwLock.writeLock().lock();
             return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory);
@@ -234,19 +221,4 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         }
     }
 
-    protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) {
-        // do not share unit of work
-        exchange.setUnitOfWork(null);
-
-        exchange.setProperty(Exchange.SPLIT_INDEX, index);
-
-        if (hdfsFile.hasNext()) {
-            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
-        } else {
-            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
-            // streaming mode, so set total size when we are complete based on the index
-            exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
-        }
-    }
-
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 8930587..430e20c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -25,7 +25,7 @@ public enum HdfsFileType {
     NORMAL_FILE(new HdfsNormalFileHandler()),
     SEQUENCE_FILE(new HdfsSequenceFileHandler()),
     MAP_FILE(new HdfsMapFileHandler()),
-    BLOOMMAP_FILE(new HdfsBloomMapFileHandler()),
+    BLOOMMAP_FILE(new HdfsBloommapFileHandler()),
     ARRAY_FILE(new HdfsArrayFileTypeHandler());
 
     private final HdfsFile file;
@@ -46,7 +46,7 @@ public enum HdfsFileType {
         return this.file.createInputStream(hdfsPath, hdfsInfoFactory);
     }
 
-    public long next(HdfsInputStream hdfsInputStream, final Holder<Object> key, final Holder<Object> value) {
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         return this.file.next(hdfsInputStream, key, value);
     }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 8673ac9..68c22f6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.hdfs;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.RuntimeCamelException;
@@ -45,8 +43,6 @@ public class HdfsInputStream implements Closeable {
 
     private boolean streamDownload;
 
-    private EntryHolder cachedNextEntry;
-
     protected HdfsInputStream() {
     }
 
@@ -96,42 +92,20 @@ public class HdfsInputStream implements Closeable {
      * @param value
      * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read
      */
-    public final long next(final Holder<Object> key, final Holder<Object> value) {
-        EntryHolder nextEntry = Optional.ofNullable(cachedNextEntry).orElseGet(() -> getNextFromStream(key, value));
-        cachedNextEntry = null;
-
-        key.setValue(nextEntry.getKey().getValue());
-        value.setValue(nextEntry.getValue().getValue());
-
-        return nextEntry.getByteCount();
-    }
-
-    private EntryHolder getNextFromStream(final Holder<Object> key, final Holder<Object> value) {
+    public final long next(Holder<Object> key, Holder<Object> value) {
         long nb = fileType.next(this, key, value);
         // when zero bytes was read from given type of file, we may still have a record (e.g., empty file)
         // null value.value is the only indication that no (new) record/chunk was read
-        if ((nb == 0 && numOfReadMessages.get() > 0) || Objects.isNull(value.getValue())) {
+        if (nb == 0 && numOfReadMessages.get() > 0) {
             // we've read all chunks from file, which size is exact multiple the chunk size
-            nb = -1;
-        } else {
+            return -1;
+        }
+        if (value.value != null) {
             numOfReadBytes.addAndGet(nb);
             numOfReadMessages.incrementAndGet();
+            return nb;
         }
-
-        return new EntryHolder(key, value, nb);
-    }
-
-    /**
-     */
-    public final boolean hasNext() {
-        if (Objects.isNull(cachedNextEntry)) {
-            Holder<Object> nextKey = new Holder<>();
-            Holder<Object> nextValue = new Holder<>();
-            long nextByteCount = next(nextKey, nextValue);
-            cachedNextEntry = new EntryHolder(nextKey, nextValue, nextByteCount);
-        }
-
-        return cachedNextEntry.hasNext();
+        return -1;
     }
 
     public final long getNumOfReadBytes() {
@@ -161,33 +135,4 @@ public class HdfsInputStream implements Closeable {
     public boolean isStreamDownload() {
         return streamDownload;
     }
-
-    private static class EntryHolder {
-
-        private long byteCount;
-        private Holder<Object> key;
-        private Holder<Object> value;
-
-        public EntryHolder(Holder<Object> key, Holder<Object> value, long byteCount) {
-            this.key = key;
-            this.value = value;
-            this.byteCount = byteCount;
-        }
-
-        public Holder<Object> getKey() {
-            return key;
-        }
-
-        public Holder<Object> getValue() {
-            return value;
-        }
-
-        public Boolean hasNext() {
-            return byteCount >= 0;
-        }
-
-        public long getByteCount() {
-            return byteCount;
-        }
-    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index 4924ddf..b63463d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -38,14 +38,10 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
             Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = new MapFile.Writer(
-                    hdfsInfo.getConfiguration(),
-                    new Path(hdfsPath),
-                    MapFile.Writer.keyClass(keyWritableClass),
-                    MapFile.Writer.valueClass(valueWritableClass),
+            rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> { })
-            );
+                    MapFile.Writer.progressable(() -> {
+                    }));
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -60,7 +56,7 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = getWritable(value, exchange, valueSize);
             ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
-            return Long.sum(keySize.getValue(), valueSize.getValue());
+            return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -87,9 +83,9 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(keyWritable, valueWritable)) {
-                key.setValue(getObject(keyWritable, keySize));
-                value.setValue(getObject(valueWritable, valueSize));
-                return Long.sum(keySize.getValue(), valueSize.getValue());
+                key.value = getObject(keyWritable, keySize);
+                value.value = getObject(valueWritable, valueSize);
+                return Long.sum(keySize.value, valueSize.value);
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index 084e6d6..e17dd1d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -106,7 +106,8 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
 
     private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
         InputStream inputStream = (InputStream) hdfsInputStream.getIn();
-        value.setValue(inputStream);
+        key.value = null;
+        value.value = inputStream;
 
         if (consumed) {
             return 0;
@@ -123,11 +124,13 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
             int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
             if (bytesRead >= 0) {
                 outputStream.write(buf, 0, bytesRead);
-                value.setValue(outputStream);
+                key.value = null;
+                value.value = outputStream;
                 return bytesRead;
             } else {
+                key.value = null;
                 // indication that we may have read from empty file
-                value.setValue(outputStream);
+                value.value = outputStream;
                 return 0;
             }
         } catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index 3c53dae..d755ebc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -35,18 +35,12 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
             HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
             Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
             Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
-            rout = SequenceFile.createWriter(
-                    hdfsInfo.getConfiguration(),
-                    SequenceFile.Writer.file(hdfsInfo.getPath()),
-                    SequenceFile.Writer.keyClass(keyWritableClass),
-                    SequenceFile.Writer.valueClass(valueWritableClass),
-                    SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
-                    SequenceFile.Writer.replication(endpointConfig.getReplication()),
-                    SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+            rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
+                    SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+                    SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
                     SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
-                    SequenceFile.Writer.progressable(() -> { }),
-                    SequenceFile.Writer.metadata(new SequenceFile.Metadata())
-            );
+                    SequenceFile.Writer.progressable(() -> {
+                    }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
             return rout;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
@@ -63,7 +57,7 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
             SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
             writer.append(keyWritable, valueWritable);
             writer.sync();
-            return Long.sum(keySize.getValue(), valueSize.getValue());
+            return Long.sum(keySize.value, valueSize.value);
         } catch (Exception ex) {
             throw new RuntimeCamelException(ex);
         }
@@ -90,9 +84,9 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
             Holder<Integer> valueSize = new Holder<>();
             Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
             if (reader.next(keyWritable, valueWritable)) {
-                key.setValue(getObject(keyWritable, keySize));
-                value.setValue(getObject(valueWritable, valueSize));
-                return Long.sum(keySize.getValue(), valueSize.getValue());
+                key.value = getObject(keyWritable, keySize);
+                value.value = getObject(valueWritable, valueSize);
+                return Long.sum(keySize.value, valueSize.value);
             } else {
                 return 0;
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
index 7f2de0f..07c5eb8 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
@@ -48,13 +48,13 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(0);
+            size.value = 0;
             return NullWritable.get();
         }
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(0);
+            size.value = 0;
             return null;
         }
     }
@@ -65,7 +65,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             ByteWritable writable = new ByteWritable();
             writable.set(typeConverter.convertTo(Byte.class, value));
             return writable;
@@ -73,7 +73,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             return ((ByteWritable) writable).get();
         }
     }
@@ -84,7 +84,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             BooleanWritable writable = new BooleanWritable();
             writable.set(typeConverter.convertTo(Boolean.class, value));
             return writable;
@@ -92,7 +92,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             return ((BooleanWritable) writable).get();
         }
     }
@@ -104,15 +104,15 @@ public class HdfsWritableFactories {
             BytesWritable writable = new BytesWritable();
             ByteBuffer bb = (ByteBuffer) value;
             writable.set(bb.array(), 0, bb.array().length);
-            size.setValue(bb.array().length);
+            size.value = bb.array().length;
             return writable;
         }
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(((BytesWritable) writable).getLength());
-            ByteBuffer bb = ByteBuffer.allocate(size.getValue());
-            bb.put(((BytesWritable) writable).getBytes(), 0, size.getValue());
+            size.value = ((BytesWritable) writable).getLength();
+            ByteBuffer bb = ByteBuffer.allocate(size.value);
+            bb.put(((BytesWritable) writable).getBytes(), 0, size.value);
             return bb;
         }
     }
@@ -123,7 +123,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             DoubleWritable writable = new DoubleWritable();
             writable.set(typeConverter.convertTo(Double.class, value));
             return writable;
@@ -131,7 +131,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             return ((DoubleWritable) writable).get();
         }
     }
@@ -142,7 +142,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             FloatWritable writable = new FloatWritable();
             writable.set(typeConverter.convertTo(Float.class, value));
             return writable;
@@ -150,7 +150,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             return ((FloatWritable) writable).get();
         }
     }
@@ -161,7 +161,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             IntWritable writable = new IntWritable();
             writable.set(typeConverter.convertTo(Integer.class, value));
             return writable;
@@ -169,7 +169,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             return ((IntWritable) writable).get();
         }
     }
@@ -180,7 +180,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             LongWritable writable = new LongWritable();
             writable.set(typeConverter.convertTo(Long.class, value));
             return writable;
@@ -188,7 +188,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(SIZE);
+            size.value = SIZE;
             return ((LongWritable) writable).get();
         }
     }
@@ -199,13 +199,13 @@ public class HdfsWritableFactories {
         public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
             Text writable = new Text();
             writable.set(typeConverter.convertTo(String.class, value));
-            size.setValue(writable.getBytes().length);
+            size.value = writable.getBytes().length;
             return writable;
         }
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(((Text) writable).getLength());
+            size.value = ((Text) writable).getLength();
             return writable.toString();
         }
     }
@@ -219,7 +219,7 @@ public class HdfsWritableFactories {
                 IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false);
                 BytesWritable writable = new BytesWritable();
                 writable.set(bos.toByteArray(), 0, bos.toByteArray().length);
-                size.setValue(bos.toByteArray().length);
+                size.value = bos.toByteArray().length;
                 return writable;
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
@@ -228,7 +228,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Object read(Writable writable, Holder<Integer> size) {
-            size.setValue(0);
+            size.value = 0;
             return null;
         }
     }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
index 458f8ea..8a02dcd 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
@@ -21,7 +21,7 @@ public final class Holder<T> {
     /**
      * The value contained in the holder.
      **/
-    private T value;
+    public T value;
 
     /**
      * Creates a new holder with a <code>null</code> value.
@@ -37,12 +37,4 @@ public final class Holder<T> {
     public Holder(T value) {
         this.value = value;
     }
-
-    public T getValue() {
-        return value;
-    }
-
-    public void setValue(T value) {
-        this.value = value;
-    }
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
index 2841a14..ce6b334 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
@@ -56,7 +55,6 @@ public class HdfsInputStreamTest {
         fileSystem = mock(FileSystem.class);
         configuration = mock(Configuration.class);
         Path path = mock(Path.class);
-        FileStatus fileStatus = mock(FileStatus.class);
 
         when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
         when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo);
@@ -65,11 +63,6 @@ public class HdfsInputStreamTest {
         when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
         when(hdfsInfo.getConfiguration()).thenReturn(configuration);
         when(hdfsInfo.getPath()).thenReturn(path);
-
-        when(path.getFileSystem(configuration)).thenReturn(fileSystem);
-
-        when(fileSystem.getFileStatus(path)).thenReturn(fileStatus);
-        when(fileStatus.getLen()).thenReturn(1000L);
     }
 
     @Test