You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/22 08:50:46 UTC
[camel] branch master updated: CAMEL-14146 : camel-hdfs - Consumer
that splits with ChunkSize should add similar header info like the Splitter
(#3354)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5b60c2e CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)
5b60c2e is described below
commit 5b60c2ea3b7b3d49f977c90c26f9f7420c55f595
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Fri Nov 22 09:50:34 2019 +0100
CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)
CAMEL-14146 - camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter
---
.../camel/component/hdfs/DefaultHdfsFile.java | 13 ++--
.../component/hdfs/HdfsArrayFileTypeHandler.java | 16 ++--
...leHandler.java => HdfsBloomMapFileHandler.java} | 19 +++--
.../apache/camel/component/hdfs/HdfsConsumer.java | 86 ++++++++++++++--------
.../apache/camel/component/hdfs/HdfsFileType.java | 4 +-
.../camel/component/hdfs/HdfsInputStream.java | 69 +++++++++++++++--
.../camel/component/hdfs/HdfsMapFileHandler.java | 18 +++--
.../component/hdfs/HdfsNormalFileHandler.java | 9 +--
.../component/hdfs/HdfsSequenceFileHandler.java | 24 +++---
.../component/hdfs/HdfsWritableFactories.java | 44 +++++------
.../org/apache/camel/component/hdfs/Holder.java | 10 ++-
.../camel/component/hdfs/HdfsInputStreamTest.java | 7 ++
12 files changed, 217 insertions(+), 102 deletions(-)
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index cc0845e..04f126d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -24,6 +24,7 @@ import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.camel.Exchange;
import org.apache.camel.util.IOHelper;
@@ -63,11 +64,8 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
}
protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) {
- Class<?> objCls = obj == null ? null : obj.getClass();
- HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
- if (objWritableFactory == null) {
- objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
- }
+ Class<?> objCls = Optional.ofNullable(obj).orElse(new UnknownType()).getClass();
+ HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.getOrDefault(objCls, new HdfsWritableFactories.HdfsObjectWritableFactory());
return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size);
}
@@ -97,7 +95,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory());
writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory());
writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory());
- writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory());
+ writables.put(UnknownType.class, new HdfsWritableFactories.HdfsNullWritableFactory());
}
static {
@@ -112,4 +110,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory());
}
}
+
+ private static final class UnknownType {
+ }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 2a0e68c..4db8bd7 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -36,8 +36,14 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
- endpointConfig.getCompressionType(), () -> { });
+ rout = new ArrayFile.Writer(
+ hdfsInfo.getConfiguration(),
+ hdfsInfo.getFileSystem(),
+ hdfsPath,
+ valueWritableClass,
+ endpointConfig.getCompressionType(),
+ () -> { }
+ );
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -50,7 +56,7 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = getWritable(value, exchange, valueSize);
((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
- return valueSize.value;
+ return valueSize.getValue();
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -75,8 +81,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(valueWritable) != null) {
- value.value = getObject(valueWritable, valueSize);
- return valueSize.value;
+ value.setValue(getObject(valueWritable, valueSize));
+ return valueSize.getValue();
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
similarity index 86%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
index 0b1f907..44301ae 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
-class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
+class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
@SuppressWarnings("rawtypes")
@Override
@@ -39,11 +39,14 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+ rout = new BloomMapFile.Writer(
+ hdfsInfo.getConfiguration(),
+ new Path(hdfsPath),
+ MapFile.Writer.keyClass(keyWritableClass),
MapFile.Writer.valueClass(valueWritableClass),
MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- MapFile.Writer.progressable(() -> {
- }));
+ MapFile.Writer.progressable(() -> { })
+ );
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -58,7 +61,7 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = getWritable(value, exchange, valueSize);
((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
- return Long.sum(keySize.value, valueSize.value);
+ return Long.sum(keySize.getValue(), valueSize.getValue());
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -85,9 +88,9 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(keyWritable, valueWritable)) {
- key.value = getObject(keyWritable, keySize);
- value.value = getObject(valueWritable, valueSize);
- return Long.sum(keySize.value, valueSize.value);
+ key.setValue(getObject(keyWritable, keySize));
+ value.setValue(getObject(valueWritable, valueSize));
+ return Long.sum(keySize.getValue(), valueSize.getValue());
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index affad6c..8e105cd 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -18,11 +18,13 @@ package org.apache.camel.component.hdfs;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import javax.security.auth.login.Configuration;
@@ -127,54 +129,68 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
}
private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) {
- final AtomicInteger messageCount = new AtomicInteger(0);
+ final AtomicInteger totalMessageCount = new AtomicInteger(0);
- Arrays.stream(fileStatuses)
+ List<HdfsInputStream> hdfsFiles = Arrays.stream(fileStatuses)
.filter(status -> normalFileIsDirectoryHasSuccessFile(status, info))
.filter(this::hasMatchingOwner)
.limit(endpointConfig.getMaxMessagesPerPoll())
- .map(this::createInputStream)
+ .map(this::asHdfsFile)
.filter(Objects::nonNull)
- .forEach(hdfsInputStream -> {
- try {
- processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length);
- } finally {
- IOHelper.close(hdfsInputStream, "input stream", log);
- }
- });
+ .collect(Collectors.toList());
- return messageCount.get();
+ log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length);
+
+ for (int i = 0; i < hdfsFiles.size(); i++) {
+ HdfsInputStream hdfsFile = hdfsFiles.get(i);
+ try {
+ int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount);
+ log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size());
+ log.debug("File [{}] was split to [{}] messages.", i, messageCount);
+ } finally {
+ IOHelper.close(hdfsFile, "hdfs file", log);
+ }
+ }
+
+ return totalMessageCount.get();
}
- private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) {
- Holder<Object> key = new Holder<>();
- Holder<Object> value = new Holder<>();
+ private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) {
+ final AtomicInteger messageCount = new AtomicInteger(0);
+ Holder<Object> currentKey = new Holder<>();
+ Holder<Object> currentValue = new Holder<>();
- while (inputStream.next(key, value) >= 0) {
- processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
+ while (hdfsFile.next(currentKey, currentValue) >= 0) {
+ processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount);
+ messageCount.incrementAndGet();
}
+
+ return messageCount.get();
}
- private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) {
+ private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, AtomicInteger totalMessageCount) {
Exchange exchange = this.getEndpoint().createExchange();
Message message = exchange.getIn();
- String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
+ String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
message.setHeader(Exchange.FILE_NAME, fileName);
message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
- message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath());
- if (key.value != null) {
- message.setHeader(HdfsHeader.KEY.name(), key.value);
+ message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath());
+ if (key.getValue() != null) {
+ message.setHeader(HdfsHeader.KEY.name(), key.getValue());
}
- if (inputStream.getNumOfReadBytes() >= 0) {
- message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes());
+ if (hdfsFile.getNumOfReadBytes() >= 0) {
+ message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes());
}
- message.setBody(value.value);
+ message.setBody(value.getValue());
+
+ updateNewExchange(exchange, messageCount.get(), hdfsFile);
- log.debug("Processing file {}", fileName);
+ log.debug("Processing file [{}]", fileName);
try {
processor.process(exchange);
+ totalMessageCount.incrementAndGet();
} catch (Exception e) {
exchange.setException(e);
}
@@ -183,9 +199,6 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
if (exchange.getException() != null) {
getExceptionHandler().handleException(exchange.getException());
}
-
- int count = messageCount.incrementAndGet();
- log.debug("Processed [{}] files out of [{}]", count, totalFiles);
}
private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) {
@@ -212,7 +225,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
return true;
}
- private HdfsInputStream createInputStream(FileStatus fileStatus) {
+ private HdfsInputStream asHdfsFile(FileStatus fileStatus) {
try {
this.rwLock.writeLock().lock();
return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory);
@@ -221,4 +234,19 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
}
}
+ protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) {
+ // do not share unit of work
+ exchange.setUnitOfWork(null);
+
+ exchange.setProperty(Exchange.SPLIT_INDEX, index);
+
+ if (hdfsFile.hasNext()) {
+ exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
+ } else {
+ exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
+ // streaming mode, so set total size when we are complete based on the index
+ exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
+ }
+ }
+
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 430e20c..8930587 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -25,7 +25,7 @@ public enum HdfsFileType {
NORMAL_FILE(new HdfsNormalFileHandler()),
SEQUENCE_FILE(new HdfsSequenceFileHandler()),
MAP_FILE(new HdfsMapFileHandler()),
- BLOOMMAP_FILE(new HdfsBloommapFileHandler()),
+ BLOOMMAP_FILE(new HdfsBloomMapFileHandler()),
ARRAY_FILE(new HdfsArrayFileTypeHandler());
private final HdfsFile file;
@@ -46,7 +46,7 @@ public enum HdfsFileType {
return this.file.createInputStream(hdfsPath, hdfsInfoFactory);
}
- public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+ public long next(HdfsInputStream hdfsInputStream, final Holder<Object> key, final Holder<Object> value) {
return this.file.next(hdfsInputStream, key, value);
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 68c22f6..8673ac9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.hdfs;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.RuntimeCamelException;
@@ -43,6 +45,8 @@ public class HdfsInputStream implements Closeable {
private boolean streamDownload;
+ private EntryHolder cachedNextEntry;
+
protected HdfsInputStream() {
}
@@ -92,20 +96,42 @@ public class HdfsInputStream implements Closeable {
* @param value
* @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read
*/
- public final long next(Holder<Object> key, Holder<Object> value) {
+ public final long next(final Holder<Object> key, final Holder<Object> value) {
+ EntryHolder nextEntry = Optional.ofNullable(cachedNextEntry).orElseGet(() -> getNextFromStream(key, value));
+ cachedNextEntry = null;
+
+ key.setValue(nextEntry.getKey().getValue());
+ value.setValue(nextEntry.getValue().getValue());
+
+ return nextEntry.getByteCount();
+ }
+
+ private EntryHolder getNextFromStream(final Holder<Object> key, final Holder<Object> value) {
long nb = fileType.next(this, key, value);
// when zero bytes was read from given type of file, we may still have a record (e.g., empty file)
// null value.value is the only indication that no (new) record/chunk was read
- if (nb == 0 && numOfReadMessages.get() > 0) {
+ if ((nb == 0 && numOfReadMessages.get() > 0) || Objects.isNull(value.getValue())) {
// we've read all chunks from file, which size is exact multiple the chunk size
- return -1;
- }
- if (value.value != null) {
+ nb = -1;
+ } else {
numOfReadBytes.addAndGet(nb);
numOfReadMessages.incrementAndGet();
- return nb;
}
- return -1;
+
+ return new EntryHolder(key, value, nb);
+ }
+
+ /**
+ */
+ public final boolean hasNext() {
+ if (Objects.isNull(cachedNextEntry)) {
+ Holder<Object> nextKey = new Holder<>();
+ Holder<Object> nextValue = new Holder<>();
+ long nextByteCount = next(nextKey, nextValue);
+ cachedNextEntry = new EntryHolder(nextKey, nextValue, nextByteCount);
+ }
+
+ return cachedNextEntry.hasNext();
}
public final long getNumOfReadBytes() {
@@ -135,4 +161,33 @@ public class HdfsInputStream implements Closeable {
public boolean isStreamDownload() {
return streamDownload;
}
+
+ private static class EntryHolder {
+
+ private long byteCount;
+ private Holder<Object> key;
+ private Holder<Object> value;
+
+ public EntryHolder(Holder<Object> key, Holder<Object> value, long byteCount) {
+ this.key = key;
+ this.value = value;
+ this.byteCount = byteCount;
+ }
+
+ public Holder<Object> getKey() {
+ return key;
+ }
+
+ public Holder<Object> getValue() {
+ return value;
+ }
+
+ public Boolean hasNext() {
+ return byteCount >= 0;
+ }
+
+ public long getByteCount() {
+ return byteCount;
+ }
+ }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index b63463d..4924ddf 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -38,10 +38,14 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+ rout = new MapFile.Writer(
+ hdfsInfo.getConfiguration(),
+ new Path(hdfsPath),
+ MapFile.Writer.keyClass(keyWritableClass),
+ MapFile.Writer.valueClass(valueWritableClass),
MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- MapFile.Writer.progressable(() -> {
- }));
+ MapFile.Writer.progressable(() -> { })
+ );
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -56,7 +60,7 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = getWritable(value, exchange, valueSize);
((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
- return Long.sum(keySize.value, valueSize.value);
+ return Long.sum(keySize.getValue(), valueSize.getValue());
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -83,9 +87,9 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(keyWritable, valueWritable)) {
- key.value = getObject(keyWritable, keySize);
- value.value = getObject(valueWritable, valueSize);
- return Long.sum(keySize.value, valueSize.value);
+ key.setValue(getObject(keyWritable, keySize));
+ value.setValue(getObject(valueWritable, valueSize));
+ return Long.sum(keySize.getValue(), valueSize.getValue());
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index e17dd1d..084e6d6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -106,8 +106,7 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
InputStream inputStream = (InputStream) hdfsInputStream.getIn();
- key.value = null;
- value.value = inputStream;
+ value.setValue(inputStream);
if (consumed) {
return 0;
@@ -124,13 +123,11 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
if (bytesRead >= 0) {
outputStream.write(buf, 0, bytesRead);
- key.value = null;
- value.value = outputStream;
+ value.setValue(outputStream);
return bytesRead;
} else {
- key.value = null;
// indication that we may have read from empty file
- value.value = outputStream;
+ value.setValue(outputStream);
return 0;
}
} catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index d755ebc..3c53dae 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -35,12 +35,18 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
- SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
- SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+ rout = SequenceFile.createWriter(
+ hdfsInfo.getConfiguration(),
+ SequenceFile.Writer.file(hdfsInfo.getPath()),
+ SequenceFile.Writer.keyClass(keyWritableClass),
+ SequenceFile.Writer.valueClass(valueWritableClass),
+ SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+ SequenceFile.Writer.replication(endpointConfig.getReplication()),
+ SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- SequenceFile.Writer.progressable(() -> {
- }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
+ SequenceFile.Writer.progressable(() -> { }),
+ SequenceFile.Writer.metadata(new SequenceFile.Metadata())
+ );
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -57,7 +63,7 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
writer.append(keyWritable, valueWritable);
writer.sync();
- return Long.sum(keySize.value, valueSize.value);
+ return Long.sum(keySize.getValue(), valueSize.getValue());
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -84,9 +90,9 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(keyWritable, valueWritable)) {
- key.value = getObject(keyWritable, keySize);
- value.value = getObject(valueWritable, valueSize);
- return Long.sum(keySize.value, valueSize.value);
+ key.setValue(getObject(keyWritable, keySize));
+ value.setValue(getObject(valueWritable, valueSize));
+ return Long.sum(keySize.getValue(), valueSize.getValue());
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
index 07c5eb8..7f2de0f 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
@@ -48,13 +48,13 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = 0;
+ size.setValue(0);
return NullWritable.get();
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = 0;
+ size.setValue(0);
return null;
}
}
@@ -65,7 +65,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
ByteWritable writable = new ByteWritable();
writable.set(typeConverter.convertTo(Byte.class, value));
return writable;
@@ -73,7 +73,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
return ((ByteWritable) writable).get();
}
}
@@ -84,7 +84,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
BooleanWritable writable = new BooleanWritable();
writable.set(typeConverter.convertTo(Boolean.class, value));
return writable;
@@ -92,7 +92,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
return ((BooleanWritable) writable).get();
}
}
@@ -104,15 +104,15 @@ public class HdfsWritableFactories {
BytesWritable writable = new BytesWritable();
ByteBuffer bb = (ByteBuffer) value;
writable.set(bb.array(), 0, bb.array().length);
- size.value = bb.array().length;
+ size.setValue(bb.array().length);
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = ((BytesWritable) writable).getLength();
- ByteBuffer bb = ByteBuffer.allocate(size.value);
- bb.put(((BytesWritable) writable).getBytes(), 0, size.value);
+ size.setValue(((BytesWritable) writable).getLength());
+ ByteBuffer bb = ByteBuffer.allocate(size.getValue());
+ bb.put(((BytesWritable) writable).getBytes(), 0, size.getValue());
return bb;
}
}
@@ -123,7 +123,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
DoubleWritable writable = new DoubleWritable();
writable.set(typeConverter.convertTo(Double.class, value));
return writable;
@@ -131,7 +131,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
return ((DoubleWritable) writable).get();
}
}
@@ -142,7 +142,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
FloatWritable writable = new FloatWritable();
writable.set(typeConverter.convertTo(Float.class, value));
return writable;
@@ -150,7 +150,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
return ((FloatWritable) writable).get();
}
}
@@ -161,7 +161,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
IntWritable writable = new IntWritable();
writable.set(typeConverter.convertTo(Integer.class, value));
return writable;
@@ -169,7 +169,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
return ((IntWritable) writable).get();
}
}
@@ -180,7 +180,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
LongWritable writable = new LongWritable();
writable.set(typeConverter.convertTo(Long.class, value));
return writable;
@@ -188,7 +188,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = SIZE;
+ size.setValue(SIZE);
return ((LongWritable) writable).get();
}
}
@@ -199,13 +199,13 @@ public class HdfsWritableFactories {
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
Text writable = new Text();
writable.set(typeConverter.convertTo(String.class, value));
- size.value = writable.getBytes().length;
+ size.setValue(writable.getBytes().length);
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = ((Text) writable).getLength();
+ size.setValue(((Text) writable).getLength());
return writable.toString();
}
}
@@ -219,7 +219,7 @@ public class HdfsWritableFactories {
IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false);
BytesWritable writable = new BytesWritable();
writable.set(bos.toByteArray(), 0, bos.toByteArray().length);
- size.value = bos.toByteArray().length;
+ size.setValue(bos.toByteArray().length);
return writable;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -228,7 +228,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.value = 0;
+ size.setValue(0);
return null;
}
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
index 8a02dcd..458f8ea 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
@@ -21,7 +21,7 @@ public final class Holder<T> {
/**
* The value contained in the holder.
**/
- public T value;
+ private T value;
/**
* Creates a new holder with a <code>null</code> value.
@@ -37,4 +37,12 @@ public final class Holder<T> {
public Holder(T value) {
this.value = value;
}
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
index ce6b334..2841a14 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
@@ -55,6 +56,7 @@ public class HdfsInputStreamTest {
fileSystem = mock(FileSystem.class);
configuration = mock(Configuration.class);
Path path = mock(Path.class);
+ FileStatus fileStatus = mock(FileStatus.class);
when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo);
@@ -63,6 +65,11 @@ public class HdfsInputStreamTest {
when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
when(hdfsInfo.getConfiguration()).thenReturn(configuration);
when(hdfsInfo.getPath()).thenReturn(path);
+
+ when(path.getFileSystem(configuration)).thenReturn(fileSystem);
+
+ when(fileSystem.getFileStatus(path)).thenReturn(fileStatus);
+ when(fileStatus.getLen()).thenReturn(1000L);
}
@Test