You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/03/05 13:39:29 UTC

camel git commit: [CAMEL-7318] Improve hdfs:// consumer concurrency

Repository: camel
Updated Branches:
  refs/heads/master f13ed8e9b -> 4028e2b33


[CAMEL-7318] Improve hdfs:// consumer concurrency


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4028e2b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4028e2b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4028e2b3

Branch: refs/heads/master
Commit: 4028e2b33b7f424e82fe6756c500d7d01a9ce77b
Parents: f13ed8e
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Thu Mar 5 13:37:10 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Thu Mar 5 13:37:10 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hdfs/HdfsConsumer.java      |  6 ++
 .../camel/component/hdfs/HdfsInputStream.java   | 13 ++++-
 .../camel/component/hdfs/HdfsConsumerTest.java  | 49 ++++++++++++++++
 .../camel/component/hdfs2/HdfsConsumer.java     |  6 ++
 .../camel/component/hdfs2/HdfsInputStream.java  | 13 ++++-
 .../camel/component/hdfs2/HdfsConsumerTest.java | 49 ++++++++++++++++
 .../hdfs2/integration/HdfsAppendTest.java       | 33 +++++++++++
 .../HdfsProducerConsumerIntegrationTest.java    | 59 ++++++++++++++++++++
 8 files changed, 222 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index f718238..a8d09db 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -138,6 +138,12 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
             try {
                 this.rwlock.writeLock().lock();
                 this.istream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
+                if (!this.istream.isOpened()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Skipping file: {} because it doesn't exist anymore", status.getPath().toString());
+                    }
+                    continue;
+                }
             } finally {
                 this.rwlock.writeLock().unlock();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 24342f6..e0b1562 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -46,9 +46,12 @@ public class HdfsInputStream implements Closeable {
         ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
         ret.chunkSize = configuration.getChunkSize();
         HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
-        info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
-        ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
-        ret.opened = true;
+        if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
+            ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
+            ret.opened = true;
+        } else {
+            ret.opened = false;
+        }
         return ret;
     }
 
@@ -98,4 +101,8 @@ public class HdfsInputStream implements Closeable {
         return in;
     }
 
+    public boolean isOpened() {
+        return opened;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 294fae2..5478df8 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -18,14 +18,18 @@ package org.apache.camel.component.hdfs;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
@@ -108,6 +112,51 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     }
 
     @Test
+    public void testConcurrentConsumers() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        int ITERATIONS = 200;
+
+        final File dir = new File("target/test/multiple-consumers");
+        dir.mkdirs();
+        for (int i = 1; i <= ITERATIONS; i++) {
+            FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
+            fos.write(String.format("hello (%04d)\n", i).getBytes());
+            fos.close();
+        }
+
+        final Set<String> fileNames = new HashSet<String>();
+        final CountDownLatch latch = new CountDownLatch(ITERATIONS);
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+                latch.countDown();
+            }
+        });
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0").to("mock:result");
+                from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0").to("mock:result");
+                from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0").to("mock:result");
+                from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedMessageCount(ITERATIONS);
+
+        latch.await(30, TimeUnit.SECONDS);
+
+        resultEndpoint.assertIsSatisfied();
+        assertThat(fileNames.size(), equalTo(ITERATIONS));
+    }
+
+    @Test
     public void testSimpleConsumerWithEmptyFile() throws Exception {
         if (!canTest()) {
             return;

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
index 28192fa..395f00b 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
@@ -138,6 +138,12 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
             try {
                 this.rwlock.writeLock().lock();
                 this.istream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
+                if (!this.istream.isOpened()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Skipping file: {} because it doesn't exist anymore", status.getPath().toString());
+                    }
+                    continue;
+                }
             } finally {
                 this.rwlock.writeLock().unlock();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
index 50970dd..b38beaf 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
@@ -46,9 +46,12 @@ public class HdfsInputStream implements Closeable {
         ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
         ret.chunkSize = configuration.getChunkSize();
         HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
-        info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
-        ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
-        ret.opened = true;
+        if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
+            ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
+            ret.opened = true;
+        } else {
+            ret.opened = false;
+        }
         return ret;
     }
 
@@ -104,4 +107,8 @@ public class HdfsInputStream implements Closeable {
         return in;
     }
 
+    public boolean isOpened() {
+        return opened;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
index b9253b7..55f7377 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
@@ -18,15 +18,19 @@ package org.apache.camel.component.hdfs2;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
@@ -109,6 +113,51 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     }
 
     @Test
+    public void testConcurrentConsumers() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        int ITERATIONS = 200;
+
+        final File dir = new File("target/test/multiple-consumers");
+        dir.mkdirs();
+        for (int i = 1; i <= ITERATIONS; i++) {
+            FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
+            fos.write(String.format("hello (%04d)\n", i).getBytes());
+            fos.close();
+        }
+
+        final Set<String> fileNames = new HashSet<String>();
+        final CountDownLatch latch = new CountDownLatch(ITERATIONS);
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+                latch.countDown();
+            }
+        });
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0").to("mock:result");
+                from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0").to("mock:result");
+                from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0").to("mock:result");
+                from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedMessageCount(ITERATIONS);
+
+        latch.await(30, TimeUnit.SECONDS);
+
+        resultEndpoint.assertIsSatisfied();
+        assertThat(fileNames.size(), equalTo(ITERATIONS));
+    }
+
+    @Test
     public void testSimpleConsumerWithEmptyFile() throws Exception {
         if (!canTest()) {
             return;

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
index 1ee6255..5c3f7f8 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hdfs2.integration;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,36 @@ public class HdfsAppendTest extends CamelTestSupport {
         in.close();
     }
 
+    @Test
+    public void testAppendWithDynamicFileName() throws Exception {
+        int ITERATIONS = 10;
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start1").to("hdfs2://localhost:9000/tmp/test-dynamic/?append=true&fileSystemType=HDFS");
+            }
+        });
+        startCamelContext();
+
+        for (int i = 0; i < ITERATIONS; ++i) {
+            template.sendBodyAndHeader("direct:start1", "HELLO", Exchange.FILE_NAME, "camel-hdfs2.log");
+        }
+
+        Configuration conf = new Configuration();
+        Path file = new Path("hdfs://localhost:9000/tmp/test-dynamic/camel-hdfs2.log");
+        FileSystem fs = FileSystem.get(file.toUri(), conf);
+        FSDataInputStream in = fs.open(file);
+        byte[] buffer = new byte[5];
+        for (int i = 0; i < ITERATIONS; ++i) {
+            assertEquals(5, in.read(buffer));
+            System.out.println("> " + new String(buffer));
+        }
+        int ret = in.read(buffer);
+        assertEquals(-1, ret);
+        in.close();
+    }
+
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
@@ -90,5 +121,7 @@ public class HdfsAppendTest extends CamelTestSupport {
         Path dir = new Path("hdfs://localhost:9000/tmp/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);
         fs.delete(dir, true);
+        dir = new Path("hdfs://localhost:9000/tmp/test-dynamic");
+        fs.delete(dir, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
index 5c04bba..c59efc2 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
@@ -16,15 +16,28 @@
  */
 package org.apache.camel.component.hdfs2.integration;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockComponent;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.After;
@@ -73,6 +86,51 @@ public class HdfsProducerConsumerIntegrationTest extends CamelTestSupport {
         assertThat(sent.isEmpty(), is(true));
     }
 
+    @Test
+    // see https://issues.apache.org/jira/browse/CAMEL-7318
+    public void testMultipleConsumers() throws Exception {
+        int ITERATIONS = 400;
+
+        Path p = new Path("hdfs://localhost:9000/tmp/test/multiple-consumers");
+        FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
+        fs.mkdirs(p);
+        for (int i = 1; i <= ITERATIONS; i++) {
+            FSDataOutputStream os = fs.create(new Path(p, String.format("file-%03d.txt", i)));
+            os.write(String.format("hello (%03d)\n", i).getBytes());
+            os.close();
+        }
+
+        final Set<String> fileNames = new HashSet<String>();
+        final CountDownLatch latch = new CountDownLatch(ITERATIONS);
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+                latch.countDown();
+            }
+        });
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                // difference in chunkSize only to allow multiple consumers
+                from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=128").to("mock:result");
+                from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=256").to("mock:result");
+                from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=512").to("mock:result");
+                from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=1024").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedMessageCount(ITERATIONS);
+
+        latch.await(30, TimeUnit.SECONDS);
+
+        resultEndpoint.assertIsSatisfied();
+        assertThat(fileNames.size(), equalTo(ITERATIONS));
+    }
+
     @Override
     @After
     public void tearDown() throws Exception {
@@ -83,6 +141,7 @@ public class HdfsProducerConsumerIntegrationTest extends CamelTestSupport {
         Path dir = new Path("hdfs://localhost:9000/tmp/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);
         fs.delete(dir, true);
+        fs.delete(new Path("hdfs://localhost:9000/tmp/test/multiple-consumers"), true);
     }
 
 }
\ No newline at end of file