You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/04 07:49:03 UTC
[camel] branch master updated: CAMEL-14132 - camel-hdfs: Producer
throws exception when body is RemoteFile (#3310)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e4e7147 CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310)
e4e7147 is described below
commit e4e71473d62e378adce63d6a76a891f379b6affe
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Mon Nov 4 08:48:51 2019 +0100
CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310)
* hdfs stream
* hdfs stream
* Migrate towards a more hdfs object based design
* Migrate towards a more hdfs object based design
* CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile
---
.../camel/component/hdfs/DefaultHdfsFile.java | 9 +--
.../component/hdfs/HdfsArrayFileTypeHandler.java | 63 +++++++++--------
.../component/hdfs/HdfsBloommapFileHandler.java | 74 ++++++++++----------
.../apache/camel/component/hdfs/HdfsConsumer.java | 12 +---
.../org/apache/camel/component/hdfs/HdfsFile.java | 12 ++--
.../apache/camel/component/hdfs/HdfsFileType.java | 6 +-
.../camel/component/hdfs/HdfsInputStream.java | 18 ++---
.../camel/component/hdfs/HdfsMapFileHandler.java | 72 ++++++++++----------
.../component/hdfs/HdfsNormalFileHandler.java | 76 +++++++++++++--------
.../camel/component/hdfs/HdfsOutputStream.java | 6 +-
.../apache/camel/component/hdfs/HdfsProducer.java | 5 +-
.../component/hdfs/HdfsSequenceFileHandler.java | 78 +++++++++++-----------
.../camel/component/hdfs/HdfsConsumerTest.java | 68 +++++++++++++++++++
13 files changed, 293 insertions(+), 206 deletions(-)
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index 76942c8..cc0845e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.hdfs;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -24,7 +25,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-import org.apache.camel.TypeConverter;
+import org.apache.camel.Exchange;
import org.apache.camel.util.IOHelper;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
@@ -37,7 +38,7 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-abstract class DefaultHdfsFile implements HdfsFile {
+abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> implements HdfsFile<T, U, Object, Object> {
protected final long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
long numBytes = 0;
@@ -61,13 +62,13 @@ abstract class DefaultHdfsFile implements HdfsFile {
return numBytes;
}
- protected final Writable getWritable(Object obj, TypeConverter typeConverter, Holder<Integer> size) {
+ protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) {
Class<?> objCls = obj == null ? null : obj.getClass();
HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls);
if (objWritableFactory == null) {
objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory();
}
- return objWritableFactory.create(obj, typeConverter, size);
+ return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size);
}
protected final Object getObject(Writable writable, Holder<Integer> size) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
index 35bb5a9..2a0e68c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java
@@ -16,72 +16,71 @@
*/
package org.apache.camel.component.hdfs;
-import java.io.Closeable;
import java.io.IOException;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
-class HdfsArrayFileTypeHandler extends DefaultHdfsFile {
+class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFile.Reader> {
+ @SuppressWarnings("rawtypes")
@Override
- public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+ public ArrayFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
try {
- Holder<Integer> valueSize = new Holder<>();
- Writable valueWritable = getWritable(value, typeConverter, valueSize);
- ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
- return valueSize.value;
- } catch (Exception ex) {
+ ArrayFile.Writer rout;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+ Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+ rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+ endpointConfig.getCompressionType(), () -> { });
+ return rout;
+ } catch (IOException ex) {
throw new RuntimeCamelException(ex);
}
}
@Override
- public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+ public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
try {
- ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn();
Holder<Integer> valueSize = new Holder<>();
- Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
- if (reader.next(valueWritable) != null) {
- value.value = getObject(valueWritable, valueSize);
- return valueSize.value;
- } else {
- return 0;
- }
+ Writable valueWritable = getWritable(value, exchange, valueSize);
+ ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable);
+ return valueSize.value;
} catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
}
- @SuppressWarnings("rawtypes")
@Override
- public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ public ArrayFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
try {
- Closeable rout;
+ ArrayFile.Reader rin;
HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
- Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
- endpointConfig.getCompressionType(), () -> { });
- return rout;
+ rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
+ return rin;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
}
}
@Override
- public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
try {
- Closeable rin;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
- return rin;
- } catch (IOException ex) {
+ ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn();
+ Holder<Integer> valueSize = new Holder<>();
+ Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+ if (reader.next(valueWritable) != null) {
+ value.value = getObject(valueWritable, valueSize);
+ return valueSize.value;
+ } else {
+ return 0;
+ }
+ } catch (Exception ex) {
throw new RuntimeCamelException(ex);
}
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
index 422e4e6..0b1f907 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java
@@ -16,11 +16,10 @@
*/
package org.apache.camel.component.hdfs;
-import java.io.Closeable;
import java.io.IOException;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BloomMapFile;
@@ -29,15 +28,35 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
-class HdfsBloommapFileHandler extends DefaultHdfsFile {
+class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public BloomMapFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ try {
+ BloomMapFile.Writer rout;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+ Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+ Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+ rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+ MapFile.Writer.valueClass(valueWritableClass),
+ MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
+ MapFile.Writer.progressable(() -> {
+ }));
+ return rout;
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
@Override
- public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+ public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
try {
Holder<Integer> keySize = new Holder<>();
- Writable keyWritable = getWritable(key, typeConverter, keySize);
+ Writable keyWritable = getWritable(key, exchange, keySize);
Holder<Integer> valueSize = new Holder<>();
- Writable valueWritable = getWritable(value, typeConverter, valueSize);
+ Writable valueWritable = getWritable(value, exchange, valueSize);
((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
return Long.sum(keySize.value, valueSize.value);
} catch (Exception ex) {
@@ -46,6 +65,18 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile {
}
@Override
+ public BloomMapFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ try {
+ BloomMapFile.Reader rin;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
+ return rin;
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
+
+ @Override
public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
try {
MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn();
@@ -65,35 +96,4 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile {
}
}
- @SuppressWarnings("rawtypes")
- @Override
- public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
- try {
- Closeable rout;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
- Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
- Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
- MapFile.Writer.valueClass(valueWritableClass),
- MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- MapFile.Writer.progressable(() -> {
- }));
- return rout;
- } catch (IOException ex) {
- throw new RuntimeCamelException(ex);
- }
- }
-
- @Override
- public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
- try {
- Closeable rin;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
- return rin;
- } catch (IOException ex) {
- throw new RuntimeCamelException(ex);
- }
- }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 7b3f4e4..4998d7a 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -29,6 +29,7 @@ import javax.security.auth.login.Configuration;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.ScheduledPollConsumer;
import org.apache.camel.util.IOHelper;
import org.apache.commons.lang.StringUtils;
@@ -148,15 +149,8 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
Holder<Object> key = new Holder<>();
Holder<Object> value = new Holder<>();
- if (this.endpointConfig.isStreamDownload()) {
- key.value = null;
- value.value = inputStream;
- // use the input stream as the body
+ while (inputStream.next(key, value) >= 0) {
processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
- } else {
- while (inputStream.next(key, value) >= 0) {
- processHdfsInputStream(inputStream, key, value, messageCount, totalFiles);
- }
}
}
@@ -201,7 +195,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
return false;
}
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeCamelException(e);
}
}
return true;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
index 1e2d63f..9979adf 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
@@ -18,16 +18,16 @@ package org.apache.camel.component.hdfs;
import java.io.Closeable;
-import org.apache.camel.TypeConverter;
+import org.apache.camel.Exchange;
-interface HdfsFile {
+interface HdfsFile<T extends Closeable, U extends Closeable, K, V> {
- Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
+ T createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
- long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter);
+ long append(HdfsOutputStream hdfsOutputStream, K key, V value, Exchange exchange);
- Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
+ U createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory);
- long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value);
+ long next(HdfsInputStream hdfsInputStream, Holder<K> key, Holder<V> value);
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 7d4a239..430e20c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.hdfs;
import java.io.Closeable;
-import org.apache.camel.TypeConverter;
+import org.apache.camel.Exchange;
public enum HdfsFileType {
@@ -38,8 +38,8 @@ public enum HdfsFileType {
return this.file.createOutputStream(hdfsPath, hdfsInfoFactory);
}
- public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
- return this.file.append(hdfsOutputStream, key, value, typeConverter);
+ public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
+ return this.file.append(hdfsOutputStream, key, value, exchange);
}
public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 8f12aef..68c22f6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.camel.RuntimeCamelException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
@@ -30,6 +31,7 @@ public class HdfsInputStream implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HdfsInputStream.class);
private HdfsFileType fileType;
+ private HdfsInfo info;
private String actualPath;
private String suffixedPath;
private String suffixedReadPath;
@@ -39,7 +41,7 @@ public class HdfsInputStream implements Closeable {
private final AtomicLong numOfReadBytes = new AtomicLong(0L);
private final AtomicLong numOfReadMessages = new AtomicLong(0L);
- private HdfsConfiguration config;
+ private boolean streamDownload;
protected HdfsInputStream() {
}
@@ -49,7 +51,6 @@ public class HdfsInputStream implements Closeable {
* @param hdfsPath
* @param hdfsInfoFactory
* @return
- * @throws IOException
*/
public static HdfsInputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
@@ -59,18 +60,18 @@ public class HdfsInputStream implements Closeable {
iStream.suffixedPath = iStream.actualPath + '.' + endpointConfig.getOpenedSuffix();
iStream.suffixedReadPath = iStream.actualPath + '.' + endpointConfig.getReadSuffix();
iStream.chunkSize = endpointConfig.getChunkSize();
+ iStream.streamDownload = endpointConfig.isStreamDownload();
try {
- HdfsInfo info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath);
- if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
+ iStream.info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath);
+ if (iStream.info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, hdfsInfoFactory);
iStream.opened = true;
- iStream.config = endpointConfig;
} else {
LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
iStream = null;
}
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeCamelException(e);
}
return iStream;
@@ -80,8 +81,6 @@ public class HdfsInputStream implements Closeable {
public final void close() throws IOException {
if (opened) {
IOUtils.closeStream(in);
- HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(config);
- HdfsInfo info = hdfsInfoFactory.newHdfsInfo(actualPath);
info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
opened = false;
}
@@ -133,4 +132,7 @@ public class HdfsInputStream implements Closeable {
return opened;
}
+ public boolean isStreamDownload() {
+ return streamDownload;
+ }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
index 1c19162..b63463d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java
@@ -16,11 +16,10 @@
*/
package org.apache.camel.component.hdfs;
-import java.io.Closeable;
import java.io.IOException;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
@@ -28,15 +27,34 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
-class HdfsMapFileHandler extends DefaultHdfsFile {
+class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> {
@Override
- public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+ @SuppressWarnings("rawtypes")
+ public MapFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ try {
+ MapFile.Writer rout;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+ Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+ Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+ rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+ MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
+ MapFile.Writer.progressable(() -> {
+ }));
+ return rout;
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
+
+ @Override
+ public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
try {
Holder<Integer> keySize = new Holder<>();
- Writable keyWritable = getWritable(key, typeConverter, keySize);
+ Writable keyWritable = getWritable(key, exchange, keySize);
Holder<Integer> valueSize = new Holder<>();
- Writable valueWritable = getWritable(value, typeConverter, valueSize);
+ Writable valueWritable = getWritable(value, exchange, valueSize);
((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
return Long.sum(keySize.value, valueSize.value);
} catch (Exception ex) {
@@ -45,6 +63,18 @@ class HdfsMapFileHandler extends DefaultHdfsFile {
}
@Override
+ public MapFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ try {
+ MapFile.Reader rin;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
+ return rin;
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
+
+ @Override
public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
try {
MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn();
@@ -64,34 +94,4 @@ class HdfsMapFileHandler extends DefaultHdfsFile {
}
}
- @Override
- @SuppressWarnings("rawtypes")
- public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
- try {
- Closeable rout;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
- Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
- Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
- MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- MapFile.Writer.progressable(() -> {
- }));
- return rout;
- } catch (IOException ex) {
- throw new RuntimeCamelException(ex);
- }
- }
-
- @Override
- public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
- try {
- Closeable rin;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
- return rin;
- } catch (IOException ex) {
- throw new RuntimeCamelException(ex);
- }
- }
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
index 952bde8..e17dd1d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java
@@ -17,15 +17,15 @@
package org.apache.camel.component.hdfs;
import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,22 +33,24 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-class HdfsNormalFileHandler extends DefaultHdfsFile {
+class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> {
+
+ private boolean consumed;
@Override
- public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ public OutputStream createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
try {
- FSDataOutputStream rout;
+ OutputStream outputStream;
HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
if (endpointConfig.isAppend()) {
- rout = hdfsInfo.getFileSystem().append(
+ outputStream = hdfsInfo.getFileSystem().append(
hdfsInfo.getPath(),
endpointConfig.getBufferSize(),
() -> { }
);
} else {
- rout = hdfsInfo.getFileSystem().create(
+ outputStream = hdfsInfo.getFileSystem().create(
hdfsInfo.getPath(),
endpointConfig.isOverwrite(),
endpointConfig.getBufferSize(),
@@ -57,37 +59,37 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
() -> { }
);
}
- return rout;
+ return outputStream;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
}
}
@Override
- public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
- InputStream is = null;
+ public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
+ InputStream inputStream = null;
try {
- is = typeConverter.convertTo(InputStream.class, value);
- return copyBytes(is, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
+ inputStream = exchange.getContext().getTypeConverter().convertTo(InputStream.class, exchange, value);
+ return copyBytes(inputStream, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false);
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
} finally {
- IOHelper.close(is);
+ IOHelper.close(inputStream);
}
}
@Override
- public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ public InputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
try {
- Closeable rin;
+ InputStream inputStream;
HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
if (endpointConfig.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+ inputStream = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
} else {
- rin = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig));
+ inputStream = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig));
}
- return rin;
+ return inputStream;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
}
@@ -95,19 +97,40 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
@Override
public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+ if (hdfsInputStream.isStreamDownload()) {
+ return nextAsWrappedStream(hdfsInputStream, key, value);
+ } else {
+ return nextAsOutputStream(hdfsInputStream, key, value);
+ }
+ }
+
+ private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
+ InputStream inputStream = (InputStream) hdfsInputStream.getIn();
+ key.value = null;
+ value.value = inputStream;
+
+ if (consumed) {
+ return 0;
+ } else {
+ consumed = true;
+ return 1;
+ }
+ }
+
+ private long nextAsOutputStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize());
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(hdfsInputStream.getChunkSize());
byte[] buf = new byte[hdfsInputStream.getChunkSize()];
int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
if (bytesRead >= 0) {
- bos.write(buf, 0, bytesRead);
+ outputStream.write(buf, 0, bytesRead);
key.value = null;
- value.value = bos;
+ value.value = outputStream;
return bytesRead;
} else {
key.value = null;
// indication that we may have read from empty file
- value.value = bos;
+ value.value = outputStream;
return 0;
}
} catch (IOException ex) {
@@ -117,18 +140,17 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
private File getHdfsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) {
try {
- String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
+ String fileName = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
// [CAMEL-13711] Files.createTempFile not equivalent to File.createTempFile
-
File outputDest;
try {
// First trying: Files.createTempFile
- outputDest = Files.createTempFile(fname, ".hdfs").toFile();
+ outputDest = Files.createTempFile(fileName, ".hdfs").toFile();
} catch (Exception ex) {
// Now trying: File.createTempFile
- outputDest = File.createTempFile(fname, ".hdfs");
+ outputDest = File.createTempFile(fileName, ".hdfs");
}
if (outputDest.exists()) {
@@ -150,7 +172,7 @@ class HdfsNormalFileHandler extends DefaultHdfsFile {
return outputDest;
}
- return new File(outputDest, fname);
+ return new File(outputDest, fileName);
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
index b93b93e..c192d8c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
@@ -21,8 +21,8 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
@@ -84,10 +84,10 @@ public class HdfsOutputStream implements Closeable {
}
}
- public void append(Object key, Object value, TypeConverter typeConverter) {
+ public void append(Object key, Object value, Exchange exchange) {
try {
busy.set(true);
- long nb = fileType.append(this, key, value, typeConverter);
+ long nb = fileType.append(this, key, value, exchange);
numOfWrittenBytes.addAndGet(nb);
numOfWrittenMessages.incrementAndGet();
lastAccess.set(System.currentTimeMillis());
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 8f407d8..0877eef 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StringHelper;
@@ -114,7 +115,7 @@ public class HdfsProducer extends DefaultProducer {
} catch (Exception e) {
log.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
log.debug("", e);
- throw new RuntimeException(e);
+ throw new RuntimeCamelException(e);
} finally {
HdfsComponent.setJAASConfiguration(auth);
}
@@ -211,7 +212,7 @@ public class HdfsProducer extends DefaultProducer {
String path = oStream.getActualPath();
log.trace("Writing body to hdfs-file {}", path);
- oStream.append(key, body, exchange.getContext().getTypeConverter());
+ oStream.append(key, body, exchange);
idle.set(false);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
index 936a2a8..d755ebc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java
@@ -16,25 +16,44 @@
*/
package org.apache.camel.component.hdfs;
-import java.io.Closeable;
import java.io.IOException;
+import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
-class HdfsSequenceFileHandler extends DefaultHdfsFile {
+class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, SequenceFile.Reader> {
@Override
- public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) {
+ public SequenceFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ try {
+ SequenceFile.Writer rout;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
+ Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
+ Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
+ rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
+ SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
+ SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
+ SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
+ SequenceFile.Writer.progressable(() -> {
+ }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
+ return rout;
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
+
+ @Override
+ public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) {
try {
Holder<Integer> keySize = new Holder<>();
- Writable keyWritable = getWritable(key, typeConverter, keySize);
+ Writable keyWritable = getWritable(key, exchange, keySize);
Holder<Integer> valueSize = new Holder<>();
- Writable valueWritable = getWritable(value, typeConverter, valueSize);
+ Writable valueWritable = getWritable(value, exchange, valueSize);
SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut();
writer.append(keyWritable, valueWritable);
writer.sync();
@@ -45,9 +64,21 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile {
}
@Override
- public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+ public SequenceFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
+ try {
+ SequenceFile.Reader rin;
+ HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
+ rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
+ return rin;
+ } catch (IOException ex) {
+ throw new RuntimeCamelException(ex);
+ }
+ }
+
+ @Override
+ public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
try {
- SequenceFile.Reader reader = (SequenceFile.Reader) hdfsistr.getIn();
+ SequenceFile.Reader reader = (SequenceFile.Reader) hdfsInputStream.getIn();
Holder<Integer> keySize = new Holder<>();
Writable keyWritable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
Holder<Integer> valueSize = new Holder<>();
@@ -64,35 +95,4 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile {
}
}
- @Override
- public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
- try {
- Closeable rout;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig();
- Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass();
- Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass();
- rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
- SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()),
- SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()),
- SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()),
- SequenceFile.Writer.progressable(() -> {
- }), SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
- return rout;
- } catch (IOException ex) {
- throw new RuntimeCamelException(ex);
- }
- }
-
- @Override
- public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) {
- try {
- Closeable rin;
- HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath);
- rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
- return rin;
- } catch (IOException ex) {
- throw new RuntimeCamelException(ex);
- }
- }
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index cc45053..c744901 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -16,7 +16,11 @@
*/
package org.apache.camel.component.hdfs;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultExchange;
@@ -27,11 +31,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_OPENED_SUFFIX;
import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_READ_SUFFIX;
import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -129,6 +136,7 @@ public class HdfsConsumerTest {
when(endpointConfig.getOwner()).thenReturn("spiderman");
when(endpointConfig.isConnectOnStartup()).thenReturn(true);
when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL");
+ when(endpointConfig.getChunkSize()).thenReturn(100 * 1000);
when(endpoint.getCamelContext()).thenReturn(context);
when(endpoint.createExchange()).thenReturn(new DefaultExchange(context));
when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
@@ -149,6 +157,8 @@ public class HdfsConsumerTest {
when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true);
when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
+ ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+
underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
// when
@@ -156,6 +166,64 @@ public class HdfsConsumerTest {
// then
assertThat(actual, is(1));
+ verify(processor, times(1)).process(exchangeCaptor.capture());
+ Exchange exchange = exchangeCaptor.getValue();
+ assertThat(exchange, notNullValue());
+
+ ByteArrayOutputStream body = exchange.getIn().getBody(ByteArrayOutputStream.class);
+ assertThat(body, notNullValue());
+ assertThat(body.toString(), startsWith("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam eget fermentum arcu, vel dignissim ipsum."));
+
+ }
+
+ @Test
+ public void doPollFromExistingLocalFileWithStreamDownload() throws Exception {
+ // given
+ String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+ when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL);
+ when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE);
+ when(endpointConfig.getPath()).thenReturn(hdfsPath);
+ when(endpointConfig.getOwner()).thenReturn("spiderman");
+ when(endpointConfig.isConnectOnStartup()).thenReturn(true);
+ when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL");
+ when(endpointConfig.getChunkSize()).thenReturn(100 * 1000);
+ when(endpointConfig.isStreamDownload()).thenReturn(true);
+ when(endpoint.getCamelContext()).thenReturn(context);
+ when(endpoint.createExchange()).thenReturn(new DefaultExchange(context));
+ when(endpoint.getEndpointUri()).thenReturn(hdfsPath);
+
+ when(fileSystem.isFile(any(Path.class))).thenReturn(true);
+
+ FileStatus[] fileStatuses = new FileStatus[1];
+ FileStatus fileStatus = mock(FileStatus.class);
+ fileStatuses[0] = fileStatus;
+ when(fileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+ when(fileStatus.getPath()).thenReturn(new Path(hdfsPath));
+ when(fileStatus.isFile()).thenReturn(true);
+ when(fileStatus.isDirectory()).thenReturn(false);
+ when(fileStatus.getOwner()).thenReturn("spiderman");
+
+ String normalFile = CWD.getAbsolutePath() + "/src/test/resources/hdfs/normal_file.txt";
+ FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockDataInputStream(normalFile));
+ when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true);
+ when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream);
+
+ ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+
+ underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath));
+
+ // when
+ int actual = underTest.doPoll();
+
+ // then
+ assertThat(actual, is(1));
+ verify(processor, times(1)).process(exchangeCaptor.capture());
+ Exchange exchange = exchangeCaptor.getValue();
+ assertThat(exchange, notNullValue());
+
+ InputStream body = (InputStream) exchange.getIn().getBody();
+ assertThat(body, notNullValue());
+
}
}
\ No newline at end of file