You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/03/04 12:00:03 UTC
[2/2] camel git commit: [CAMEL-8434] Allow consuming empty files in
camel-hdfs2
[CAMEL-8434] Allow consuming empty files in camel-hdfs2
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4e314d26
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4e314d26
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4e314d26
Branch: refs/heads/master
Commit: 4e314d2691416e2e01be3a4457db3e21f90be41c
Parents: a547de1
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Wed Mar 4 11:19:00 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Wed Mar 4 11:58:12 2015 +0100
----------------------------------------------------------------------
.../camel/component/hdfs2/HdfsConsumer.java | 2 +-
.../camel/component/hdfs2/HdfsFileType.java | 3 +-
.../camel/component/hdfs2/HdfsInputStream.java | 17 +++-
.../camel/component/hdfs2/HdfsConsumerTest.java | 83 ++++++++++++++++++++
4 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
index 5b52f8b..28192fa 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
@@ -145,7 +145,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
try {
Holder<Object> key = new Holder<Object>();
Holder<Object> value = new Holder<Object>();
- while (this.istream.next(key, value) != 0) {
+ while (this.istream.next(key, value) >= 0) {
Exchange exchange = this.getEndpoint().createExchange();
Message message = new DefaultMessage();
String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/");
http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
index bbe0172..e6ec656 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
@@ -85,7 +85,8 @@ public enum HdfsFileType {
return bytesRead;
} else {
key.value = null;
- value.value = null;
+ // indication that we may have read from empty file
+ value.value = bos;
return 0;
}
} catch (IOException ex) {
http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
index 8e761e1..50970dd 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
@@ -62,13 +62,26 @@ public class HdfsInputStream implements Closeable {
}
}
+ /**
+ * Reads next record/chunk specific to give file type.
+ * @param key
+ * @param value
+ * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read
+ */
public final long next(Holder<Object> key, Holder<Object> value) {
long nb = fileType.next(this, key, value);
- if (nb > 0) {
+ // when zero bytes was read from given type of file, we may still have a record (e.g., empty file)
+ // null value.value is the only indication that no (new) record/chunk was read
+ if (nb == 0 && numOfReadBytes.get() > 0) {
+ // we've read all chunks from file, which size is exact multiple the chunk size
+ return -1;
+ }
+ if (value.value != null) {
numOfReadBytes.addAndGet(nb);
numOfReadMessages.incrementAndGet();
+ return nb;
}
- return nb;
+ return -1;
}
public final long getNumOfReadBytes() {
http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
index 48c155f..16aedc2 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.hdfs2;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@@ -108,6 +109,88 @@ public class HdfsConsumerTest extends HdfsTestSupport {
}
@Test
+ public void testSimpleConsumerWithEmptyFile() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(file.toUri(), conf);
+ FSDataOutputStream out = fs.create(file);
+ out.close();
+
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.assertIsSatisfied();
+ assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0));
+ }
+
+ @Test
+ public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(file.toUri(), conf);
+ FSDataOutputStream out = fs.create(file);
+ // size = 5 times chunk size = 210 bytes
+ for (int i = 0; i < 42; ++i) {
+ out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
+ out.flush();
+ }
+ out.close();
+
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(5);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.assertIsSatisfied();
+ assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(42));
+ }
+
+ @Test
+ public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath());
+ Configuration conf = new Configuration();
+ SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+ writer.sync();
+ writer.close();
+
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(0);
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ @Test
public void testReadWithReadSuffix() throws Exception {
if (!canTest()) {
return;