You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/22 08:51:27 UTC
[camel] 01/01: Revert "CAMEL-14146 : camel-hdfs - Consumer that
splits with ChunkSize should add similar header info like the Splitter
(#3354)"
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch revert-3354-CAMEL-14146-new
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0e4d2eced12ebfd55833f527695d3ba8e35af68f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Nov 22 09:51:18 2019 +0100
Revert "CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)"
This reverts commit 5b60c2ea3b7b3d49f977c90c26f9f7420c55f595.
---
.../camel/component/hdfs/DefaultHdfsFile.java | 13 ++--
.../component/hdfs/HdfsArrayFileTypeHandler.java | 16 ++--
...leHandler.java => HdfsBloommapFileHandler.java} | 19 ++---
.../apache/camel/component/hdfs/HdfsConsumer.java | 86 ++++++++--------------
.../apache/camel/component/hdfs/HdfsFileType.java | 4 +-
.../camel/component/hdfs/HdfsInputStream.java | 69 ++---------------
.../camel/component/hdfs/HdfsMapFileHandler.java | 18 ++---
.../component/hdfs/HdfsNormalFileHandler.java | 9 ++-
.../component/hdfs/HdfsSequenceFileHandler.java | 24 +++---
.../component/hdfs/HdfsWritableFactories.java | 44 +++++------
.../org/apache/camel/component/hdfs/Holder.java | 10 +--
.../camel/component/hdfs/HdfsInputStreamTest.java | 7 --
12 files changed, 102 insertions(+), 217 deletions(-)
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index 04f126d..cc0845e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -24,7 +24,6 @@ import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import org.apache.camel.Exchange;
import org.apache.camel.util.IOHelper;
@@ -64,8 +63,11 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
}
protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) {
- Class<?> objCls = Optional.ofNullable(obj).orElse(new UnknownType()).getClass();
- HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.getOrDefault(objCls, new HdfsWritableFactories.HdfsObjectWritableFactory());
+ Class<?> objCls = obj == null ? null : obj.getClass();
+ HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
+ if (objWritableFactory == null) {
+ objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
+ }
return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size);
}
@@ -95,7 +97,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory());
writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory());
writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory());
- writables.put(UnknownType.class, new HdfsWritableFactories.HdfsNullWritableFactory());
+ writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory());
}
static {
@@ -110,7 +112,4 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory());
}
}
-
- private static final class UnknownType {
- }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 4db8bd7..2a0e68c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -36,14 +36,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new ArrayFile.Writer(
- hdfsInfo.getConfiguration(),
- hdfsInfo.getFileSystem(),
- hdfsPath,
- valueWritableClass,
- endpointConfig.getCompressionType(),
- () -> { }
- );
+ rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+ endpointConfig.getCompressionType(), () -> { });
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -56,7 +50,7 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = getWritable(value, exchange, valueSize);
((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
- return valueSize.getValue();
+ return valueSize.value;
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -81,8 +75,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(valueWritable) != null) {
- value.setValue(getObject(valueWritable, valueSize));
- return valueSize.getValue();
+ value.value = getObject(valueWritable, valueSize);
+ return valueSize.value;
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
similarity index 86%
rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
index 44301ae..0b1f907 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
-class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
+class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
@SuppressWarnings("rawtypes")
@Override
@@ -39,14 +39,11 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new BloomMapFile.Writer(
- hdfsInfo.getConfiguration(),
- new Path(hdfsPath),
- MapFile.Writer.keyClass(keyWritableClass),
+ rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
MapFile.Writer.valueClass(valueWritableClass),
MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- MapFile.Writer.progressable(() -> { })
- );
+ MapFile.Writer.progressable(() -> {
+ }));
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -61,7 +58,7 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = getWritable(value, exchange, valueSize);
((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
- return Long.sum(keySize.getValue(), valueSize.getValue());
+ return Long.sum(keySize.value, valueSize.value);
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -88,9 +85,9 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(keyWritable, valueWritable)) {
- key.setValue(getObject(keyWritable, keySize));
- value.setValue(getObject(valueWritable, valueSize));
- return Long.sum(keySize.getValue(), valueSize.getValue());
+ key.value = getObject(keyWritable, keySize);
+ value.value = getObject(valueWritable, valueSize);
+ return Long.sum(keySize.value, valueSize.value);
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 8e105cd..affad6c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -18,13 +18,11 @@ package org.apache.camel.component.hdfs;
import java.io.IOException;
import java.util.Arrays;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
import javax.security.auth.login.Configuration;
@@ -129,68 +127,54 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
}
private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) {
- final AtomicInteger totalMessageCount = new AtomicInteger(0);
+ final AtomicInteger messageCount = new AtomicInteger(0);
- List<HdfsInputStream> hdfsFiles = Arrays.stream(fileStatuses)
+ Arrays.stream(fileStatuses)
.filter(status -> normalFileIsDirectoryHasSuccessFile(status, info))
.filter(this::hasMatchingOwner)
.limit(endpointConfig.getMaxMessagesPerPoll())
- .map(this::asHdfsFile)
+ .map(this::createInputStream)
.filter(Objects::nonNull)
- .collect(Collectors.toList());
-
- log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length);
+ .forEach(hdfsInputStream -> {
+ try {
+ processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length);
+ } finally {
+ IOHelper.close(hdfsInputStream, "input stream", log);
+ }
+ });
- for (int i = 0; i < hdfsFiles.size(); i++) {
- HdfsInputStream hdfsFile = hdfsFiles.get(i);
- try {
- int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount);
- log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size());
- log.debug("File [{}] was split to [{}] messages.", i, messageCount);
- } finally {
- IOHelper.close(hdfsFile, "hdfs file", log);
- }
- }
-
- return totalMessageCount.get();
+ return messageCount.get();
}
- private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) {
- final AtomicInteger messageCount = new AtomicInteger(0);
- Holder<Object> currentKey = new Holder<>();
- Holder<Object> currentValue = new Holder<>();
+ private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) {
+ Holder<Object> key = new Holder<>();
+ Holder<Object> value = new Holder<>();
- while (hdfsFile.next(currentKey, currentValue) >= 0) {
- processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount);
- messageCount.incrementAndGet();
+ while (inputStream.next(key, value) >= 0) {
+ processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
}
-
- return messageCount.get();
}
- private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, AtomicInteger totalMessageCount) {
+ private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) {
Exchange exchange = this.getEndpoint().createExchange();
Message message = exchange.getIn();
- String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
+ String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
message.setHeader(Exchange.FILE_NAME, fileName);
message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
- message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath());
- if (key.getValue() != null) {
- message.setHeader(HdfsHeader.KEY.name(), key.getValue());
+ message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath());
+ if (key.value != null) {
+ message.setHeader(HdfsHeader.KEY.name(), key.value);
}
- if (hdfsFile.getNumOfReadBytes() >= 0) {
- message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes());
+ if (inputStream.getNumOfReadBytes() >= 0) {
+ message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes());
}
- message.setBody(value.getValue());
-
- updateNewExchange(exchange, messageCount.get(), hdfsFile);
+ message.setBody(value.value);
- log.debug("Processing file [{}]", fileName);
+ log.debug("Processing file {}", fileName);
try {
processor.process(exchange);
- totalMessageCount.incrementAndGet();
} catch (Exception e) {
exchange.setException(e);
}
@@ -199,6 +183,9 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
if (exchange.getException() != null) {
getExceptionHandler().handleException(exchange.getException());
}
+
+ int count = messageCount.incrementAndGet();
+ log.debug("Processed [{}] files out of [{}]", count, totalFiles);
}
private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) {
@@ -225,7 +212,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
return true;
}
- private HdfsInputStream asHdfsFile(FileStatus fileStatus) {
+ private HdfsInputStream createInputStream(FileStatus fileStatus) {
try {
this.rwLock.writeLock().lock();
return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory);
@@ -234,19 +221,4 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
}
}
- protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) {
- // do not share unit of work
- exchange.setUnitOfWork(null);
-
- exchange.setProperty(Exchange.SPLIT_INDEX, index);
-
- if (hdfsFile.hasNext()) {
- exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
- } else {
- exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
- // streaming mode, so set total size when we are complete based on the index
- exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
- }
- }
-
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 8930587..430e20c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -25,7 +25,7 @@ public enum HdfsFileType {
NORMAL_FILE(new HdfsNormalFileHandler()),
SEQUENCE_FILE(new HdfsSequenceFileHandler()),
MAP_FILE(new HdfsMapFileHandler()),
- BLOOMMAP_FILE(new HdfsBloomMapFileHandler()),
+ BLOOMMAP_FILE(new HdfsBloommapFileHandler()),
ARRAY_FILE(new HdfsArrayFileTypeHandler());
private final HdfsFile file;
@@ -46,7 +46,7 @@ public enum HdfsFileType {
return this.file.createInputStream(hdfsPath, hdfsInfoFactory);
}
- public long next(HdfsInputStream hdfsInputStream, final Holder<Object> key, final Holder<Object> value) {
+ public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
return this.file.next(hdfsInputStream, key, value);
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 8673ac9..68c22f6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.hdfs;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.RuntimeCamelException;
@@ -45,8 +43,6 @@ public class HdfsInputStream implements Closeable {
private boolean streamDownload;
- private EntryHolder cachedNextEntry;
-
protected HdfsInputStream() {
}
@@ -96,42 +92,20 @@ public class HdfsInputStream implements Closeable {
* @param value
* @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read
*/
- public final long next(final Holder<Object> key, final Holder<Object> value) {
- EntryHolder nextEntry = Optional.ofNullable(cachedNextEntry).orElseGet(() -> getNextFromStream(key, value));
- cachedNextEntry = null;
-
- key.setValue(nextEntry.getKey().getValue());
- value.setValue(nextEntry.getValue().getValue());
-
- return nextEntry.getByteCount();
- }
-
- private EntryHolder getNextFromStream(final Holder<Object> key, final Holder<Object> value) {
+ public final long next(Holder<Object> key, Holder<Object> value) {
long nb = fileType.next(this, key, value);
// when zero bytes was read from given type of file, we may still have a record (e.g., empty file)
// null value.value is the only indication that no (new) record/chunk was read
- if ((nb == 0 && numOfReadMessages.get() > 0) || Objects.isNull(value.getValue())) {
+ if (nb == 0 && numOfReadMessages.get() > 0) {
// we've read all chunks from file, which size is exact multiple the chunk size
- nb = -1;
- } else {
+ return -1;
+ }
+ if (value.value != null) {
numOfReadBytes.addAndGet(nb);
numOfReadMessages.incrementAndGet();
+ return nb;
}
-
- return new EntryHolder(key, value, nb);
- }
-
- /**
- */
- public final boolean hasNext() {
- if (Objects.isNull(cachedNextEntry)) {
- Holder<Object> nextKey = new Holder<>();
- Holder<Object> nextValue = new Holder<>();
- long nextByteCount = next(nextKey, nextValue);
- cachedNextEntry = new EntryHolder(nextKey, nextValue, nextByteCount);
- }
-
- return cachedNextEntry.hasNext();
+ return -1;
}
public final long getNumOfReadBytes() {
@@ -161,33 +135,4 @@ public class HdfsInputStream implements Closeable {
public boolean isStreamDownload() {
return streamDownload;
}
-
- private static class EntryHolder {
-
- private long byteCount;
- private Holder<Object> key;
- private Holder<Object> value;
-
- public EntryHolder(Holder<Object> key, Holder<Object> value, long byteCount) {
- this.key = key;
- this.value = value;
- this.byteCount = byteCount;
- }
-
- public Holder<Object> getKey() {
- return key;
- }
-
- public Holder<Object> getValue() {
- return value;
- }
-
- public Boolean hasNext() {
- return byteCount >= 0;
- }
-
- public long getByteCount() {
- return byteCount;
- }
- }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index 4924ddf..b63463d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -38,14 +38,10 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new MapFile.Writer(
- hdfsInfo.getConfiguration(),
- new Path(hdfsPath),
- MapFile.Writer.keyClass(keyWritableClass),
- MapFile.Writer.valueClass(valueWritableClass),
+ rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- MapFile.Writer.progressable(() -> { })
- );
+ MapFile.Writer.progressable(() -> {
+ }));
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -60,7 +56,7 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = getWritable(value, exchange, valueSize);
((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
- return Long.sum(keySize.getValue(), valueSize.getValue());
+ return Long.sum(keySize.value, valueSize.value);
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -87,9 +83,9 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader>
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(keyWritable, valueWritable)) {
- key.setValue(getObject(keyWritable, keySize));
- value.setValue(getObject(valueWritable, valueSize));
- return Long.sum(keySize.getValue(), valueSize.getValue());
+ key.value = getObject(keyWritable, keySize);
+ value.value = getObject(valueWritable, valueSize);
+ return Long.sum(keySize.value, valueSize.value);
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index 084e6d6..e17dd1d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -106,7 +106,8 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
InputStream inputStream = (InputStream) hdfsInputStream.getIn();
- value.setValue(inputStream);
+ key.value = null;
+ value.value = inputStream;
if (consumed) {
return 0;
@@ -123,11 +124,13 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
if (bytesRead >= 0) {
outputStream.write(buf, 0, bytesRead);
- value.setValue(outputStream);
+ key.value = null;
+ value.value = outputStream;
return bytesRead;
} else {
+ key.value = null;
// indication that we may have read from empty file
- value.setValue(outputStream);
+ value.value = outputStream;
return 0;
}
} catch (IOException ex) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index 3c53dae..d755ebc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -35,18 +35,12 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = SequenceFile.createWriter(
- hdfsInfo.getConfiguration(),
- SequenceFile.Writer.file(hdfsInfo.getPath()),
- SequenceFile.Writer.keyClass(keyWritableClass),
- SequenceFile.Writer.valueClass(valueWritableClass),
- SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
- SequenceFile.Writer.replication(endpointConfig.getReplication()),
- SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+ rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
+ SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+ SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- SequenceFile.Writer.progressable(() -> { }),
- SequenceFile.Writer.metadata(new SequenceFile.Metadata())
- );
+ SequenceFile.Writer.progressable(() -> {
+ }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
return rout;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -63,7 +57,7 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
writer.append(keyWritable, valueWritable);
writer.sync();
- return Long.sum(keySize.getValue(), valueSize.getValue());
+ return Long.sum(keySize.value, valueSize.value);
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
@@ -90,9 +84,9 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque
Holder<Integer> valueSize = new Holder<>();
Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
if (reader.next(keyWritable, valueWritable)) {
- key.setValue(getObject(keyWritable, keySize));
- value.setValue(getObject(valueWritable, valueSize));
- return Long.sum(keySize.getValue(), valueSize.getValue());
+ key.value = getObject(keyWritable, keySize);
+ value.value = getObject(valueWritable, valueSize);
+ return Long.sum(keySize.value, valueSize.value);
} else {
return 0;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
index 7f2de0f..07c5eb8 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
@@ -48,13 +48,13 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(0);
+ size.value = 0;
return NullWritable.get();
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(0);
+ size.value = 0;
return null;
}
}
@@ -65,7 +65,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
ByteWritable writable = new ByteWritable();
writable.set(typeConverter.convertTo(Byte.class, value));
return writable;
@@ -73,7 +73,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
return ((ByteWritable) writable).get();
}
}
@@ -84,7 +84,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
BooleanWritable writable = new BooleanWritable();
writable.set(typeConverter.convertTo(Boolean.class, value));
return writable;
@@ -92,7 +92,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
return ((BooleanWritable) writable).get();
}
}
@@ -104,15 +104,15 @@ public class HdfsWritableFactories {
BytesWritable writable = new BytesWritable();
ByteBuffer bb = (ByteBuffer) value;
writable.set(bb.array(), 0, bb.array().length);
- size.setValue(bb.array().length);
+ size.value = bb.array().length;
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(((BytesWritable) writable).getLength());
- ByteBuffer bb = ByteBuffer.allocate(size.getValue());
- bb.put(((BytesWritable) writable).getBytes(), 0, size.getValue());
+ size.value = ((BytesWritable) writable).getLength();
+ ByteBuffer bb = ByteBuffer.allocate(size.value);
+ bb.put(((BytesWritable) writable).getBytes(), 0, size.value);
return bb;
}
}
@@ -123,7 +123,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
DoubleWritable writable = new DoubleWritable();
writable.set(typeConverter.convertTo(Double.class, value));
return writable;
@@ -131,7 +131,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
return ((DoubleWritable) writable).get();
}
}
@@ -142,7 +142,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
FloatWritable writable = new FloatWritable();
writable.set(typeConverter.convertTo(Float.class, value));
return writable;
@@ -150,7 +150,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
return ((FloatWritable) writable).get();
}
}
@@ -161,7 +161,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
IntWritable writable = new IntWritable();
writable.set(typeConverter.convertTo(Integer.class, value));
return writable;
@@ -169,7 +169,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
return ((IntWritable) writable).get();
}
}
@@ -180,7 +180,7 @@ public class HdfsWritableFactories {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
LongWritable writable = new LongWritable();
writable.set(typeConverter.convertTo(Long.class, value));
return writable;
@@ -188,7 +188,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(SIZE);
+ size.value = SIZE;
return ((LongWritable) writable).get();
}
}
@@ -199,13 +199,13 @@ public class HdfsWritableFactories {
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
Text writable = new Text();
writable.set(typeConverter.convertTo(String.class, value));
- size.setValue(writable.getBytes().length);
+ size.value = writable.getBytes().length;
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(((Text) writable).getLength());
+ size.value = ((Text) writable).getLength();
return writable.toString();
}
}
@@ -219,7 +219,7 @@ public class HdfsWritableFactories {
IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false);
BytesWritable writable = new BytesWritable();
writable.set(bos.toByteArray(), 0, bos.toByteArray().length);
- size.setValue(bos.toByteArray().length);
+ size.value = bos.toByteArray().length;
return writable;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
@@ -228,7 +228,7 @@ public class HdfsWritableFactories {
@Override
public Object read(Writable writable, Holder<Integer> size) {
- size.setValue(0);
+ size.value = 0;
return null;
}
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
index 458f8ea..8a02dcd 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java
@@ -21,7 +21,7 @@ public final class Holder<T> {
/**
* The value contained in the holder.
**/
- private T value;
+ public T value;
/**
* Creates a new holder with a <code>null</code> value.
@@ -37,12 +37,4 @@ public final class Holder<T> {
public Holder(T value) {
this.value = value;
}
-
- public T getValue() {
- return value;
- }
-
- public void setValue(T value) {
- this.value = value;
- }
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
index 2841a14..ce6b334 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
@@ -56,7 +55,6 @@ public class HdfsInputStreamTest {
fileSystem = mock(FileSystem.class);
configuration = mock(Configuration.class);
Path path = mock(Path.class);
- FileStatus fileStatus = mock(FileStatus.class);
when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo);
when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo);
@@ -65,11 +63,6 @@ public class HdfsInputStreamTest {
when(hdfsInfo.getFileSystem()).thenReturn(fileSystem);
when(hdfsInfo.getConfiguration()).thenReturn(configuration);
when(hdfsInfo.getPath()).thenReturn(path);
-
- when(path.getFileSystem(configuration)).thenReturn(fileSystem);
-
- when(fileSystem.getFileStatus(path)).thenReturn(fileStatus);
- when(fileStatus.getLen()).thenReturn(1000L);
}
@Test