You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/03/04 12:00:03 UTC

[2/2] camel git commit: [CAMEL-8434] Allow consuming empty files in camel-hdfs2

[CAMEL-8434] Allow consuming empty files in camel-hdfs2


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4e314d26
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4e314d26
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4e314d26

Branch: refs/heads/master
Commit: 4e314d2691416e2e01be3a4457db3e21f90be41c
Parents: a547de1
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Wed Mar 4 11:19:00 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Wed Mar 4 11:58:12 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hdfs2/HdfsConsumer.java     |  2 +-
 .../camel/component/hdfs2/HdfsFileType.java     |  3 +-
 .../camel/component/hdfs2/HdfsInputStream.java  | 17 +++-
 .../camel/component/hdfs2/HdfsConsumerTest.java | 83 ++++++++++++++++++++
 4 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
index 5b52f8b..28192fa 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
@@ -145,7 +145,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
             try {
                 Holder<Object> key = new Holder<Object>();
                 Holder<Object> value = new Holder<Object>();
-                while (this.istream.next(key, value) != 0) {
+                while (this.istream.next(key, value) >= 0) {
                     Exchange exchange = this.getEndpoint().createExchange();
                     Message message = new DefaultMessage();
                     String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/");

http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
index bbe0172..e6ec656 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java
@@ -85,7 +85,8 @@ public enum HdfsFileType {
                     return bytesRead;
                 } else {
                     key.value = null;
-                    value.value = null;
+                    // indication that we may have read from empty file
+                    value.value = bos;
                     return 0;
                 }
             } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
index 8e761e1..50970dd 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
@@ -62,13 +62,26 @@ public class HdfsInputStream implements Closeable {
         }
     }
 
+    /**
+     * Reads next record/chunk specific to give file type.
+     * @param key
+     * @param value
+     * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read
+     */
     public final long next(Holder<Object> key, Holder<Object> value) {
         long nb = fileType.next(this, key, value);
-        if (nb > 0) {
+        // when zero bytes was read from given type of file, we may still have a record (e.g., empty file)
+        // null value.value is the only indication that no (new) record/chunk was read
+        if (nb == 0 && numOfReadBytes.get() > 0) {
+            // we've read all chunks from file, which size is exact multiple the chunk size
+            return -1;
+        }
+        if (value.value != null) {
             numOfReadBytes.addAndGet(nb);
             numOfReadMessages.incrementAndGet();
+            return nb;
         }
-        return nb;
+        return -1;
     }
 
     public final long getNumOfReadBytes() {

http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
index 48c155f..16aedc2 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hdfs2;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -108,6 +109,88 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     }
 
     @Test
+    public void testSimpleConsumerWithEmptyFile() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.get(file.toUri(), conf);
+        FSDataOutputStream out = fs.create(file);
+        out.close();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+        assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0));
+    }
+
+    @Test
+    public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.get(file.toUri(), conf);
+        FSDataOutputStream out = fs.create(file);
+        // size = 5 times chunk size = 210 bytes
+        for (int i = 0; i < 42; ++i) {
+            out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
+            out.flush();
+        }
+        out.close();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(5);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+        assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(42));
+    }
+
+    @Test
+    public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath());
+        Configuration conf = new Configuration();
+        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+        writer.sync();
+        writer.close();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(0);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @Test
     public void testReadWithReadSuffix() throws Exception {
         if (!canTest()) {
             return;