You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/03/05 13:39:29 UTC
camel git commit: [CAMEL-7318] Improve hdfs:// consumer concurrency
Repository: camel
Updated Branches:
refs/heads/master f13ed8e9b -> 4028e2b33
[CAMEL-7318] Improve hdfs:// consumer concurrency
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4028e2b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4028e2b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4028e2b3
Branch: refs/heads/master
Commit: 4028e2b33b7f424e82fe6756c500d7d01a9ce77b
Parents: f13ed8e
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Thu Mar 5 13:37:10 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Thu Mar 5 13:37:10 2015 +0100
----------------------------------------------------------------------
.../camel/component/hdfs/HdfsConsumer.java | 6 ++
.../camel/component/hdfs/HdfsInputStream.java | 13 ++++-
.../camel/component/hdfs/HdfsConsumerTest.java | 49 ++++++++++++++++
.../camel/component/hdfs2/HdfsConsumer.java | 6 ++
.../camel/component/hdfs2/HdfsInputStream.java | 13 ++++-
.../camel/component/hdfs2/HdfsConsumerTest.java | 49 ++++++++++++++++
.../hdfs2/integration/HdfsAppendTest.java | 33 +++++++++++
.../HdfsProducerConsumerIntegrationTest.java | 59 ++++++++++++++++++++
8 files changed, 222 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index f718238..a8d09db 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -138,6 +138,12 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
try {
this.rwlock.writeLock().lock();
this.istream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
+ if (!this.istream.isOpened()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping file: {} because it doesn't exist anymore", status.getPath().toString());
+ }
+ continue;
+ }
} finally {
this.rwlock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 24342f6..e0b1562 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -46,9 +46,12 @@ public class HdfsInputStream implements Closeable {
ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
ret.chunkSize = configuration.getChunkSize();
HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
- info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
- ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
- ret.opened = true;
+ if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
+ ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
+ ret.opened = true;
+ } else {
+ ret.opened = false;
+ }
return ret;
}
@@ -98,4 +101,8 @@ public class HdfsInputStream implements Closeable {
return in;
}
+ public boolean isOpened() {
+ return opened;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 294fae2..5478df8 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -18,14 +18,18 @@ package org.apache.camel.component.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
@@ -108,6 +112,51 @@ public class HdfsConsumerTest extends HdfsTestSupport {
}
@Test
+ public void testConcurrentConsumers() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ int ITERATIONS = 200;
+
+ final File dir = new File("target/test/multiple-consumers");
+ dir.mkdirs();
+ for (int i = 1; i <= ITERATIONS; i++) {
+ FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
+ fos.write(String.format("hello (%04d)\n", i).getBytes());
+ fos.close();
+ }
+
+ final Set<String> fileNames = new HashSet<String>();
+ final CountDownLatch latch = new CountDownLatch(ITERATIONS);
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+ latch.countDown();
+ }
+ });
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0").to("mock:result");
+ from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0").to("mock:result");
+ from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0").to("mock:result");
+ from("hdfs://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0").to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.expectedMessageCount(ITERATIONS);
+
+ latch.await(30, TimeUnit.SECONDS);
+
+ resultEndpoint.assertIsSatisfied();
+ assertThat(fileNames.size(), equalTo(ITERATIONS));
+ }
+
+ @Test
public void testSimpleConsumerWithEmptyFile() throws Exception {
if (!canTest()) {
return;
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
index 28192fa..395f00b 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java
@@ -138,6 +138,12 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
try {
this.rwlock.writeLock().lock();
this.istream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
+ if (!this.istream.isOpened()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping file: {} because it doesn't exist anymore", status.getPath().toString());
+ }
+ continue;
+ }
} finally {
this.rwlock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
index 50970dd..b38beaf 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
@@ -46,9 +46,12 @@ public class HdfsInputStream implements Closeable {
ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
ret.chunkSize = configuration.getChunkSize();
HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
- info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
- ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
- ret.opened = true;
+ if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
+ ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
+ ret.opened = true;
+ } else {
+ ret.opened = false;
+ }
return ret;
}
@@ -104,4 +107,8 @@ public class HdfsInputStream implements Closeable {
return in;
}
+ public boolean isOpened() {
+ return opened;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
index b9253b7..55f7377 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
@@ -18,15 +18,19 @@ package org.apache.camel.component.hdfs2;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
@@ -109,6 +113,51 @@ public class HdfsConsumerTest extends HdfsTestSupport {
}
@Test
+ public void testConcurrentConsumers() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ int ITERATIONS = 200;
+
+ final File dir = new File("target/test/multiple-consumers");
+ dir.mkdirs();
+ for (int i = 1; i <= ITERATIONS; i++) {
+ FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
+ fos.write(String.format("hello (%04d)\n", i).getBytes());
+ fos.close();
+ }
+
+ final Set<String> fileNames = new HashSet<String>();
+ final CountDownLatch latch = new CountDownLatch(ITERATIONS);
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+ latch.countDown();
+ }
+ });
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0").to("mock:result");
+ from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0").to("mock:result");
+ from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0").to("mock:result");
+ from("hdfs2://" + dir.toURI() + "/?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0").to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.expectedMessageCount(ITERATIONS);
+
+ latch.await(30, TimeUnit.SECONDS);
+
+ resultEndpoint.assertIsSatisfied();
+ assertThat(fileNames.size(), equalTo(ITERATIONS));
+ }
+
+ @Test
public void testSimpleConsumerWithEmptyFile() throws Exception {
if (!canTest()) {
return;
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
index 1ee6255..5c3f7f8 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsAppendTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.hdfs2.integration;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,36 @@ public class HdfsAppendTest extends CamelTestSupport {
in.close();
}
+ @Test
+ public void testAppendWithDynamicFileName() throws Exception {
+ int ITERATIONS = 10;
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start1").to("hdfs2://localhost:9000/tmp/test-dynamic/?append=true&fileSystemType=HDFS");
+ }
+ });
+ startCamelContext();
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ template.sendBodyAndHeader("direct:start1", "HELLO", Exchange.FILE_NAME, "camel-hdfs2.log");
+ }
+
+ Configuration conf = new Configuration();
+ Path file = new Path("hdfs://localhost:9000/tmp/test-dynamic/camel-hdfs2.log");
+ FileSystem fs = FileSystem.get(file.toUri(), conf);
+ FSDataInputStream in = fs.open(file);
+ byte[] buffer = new byte[5];
+ for (int i = 0; i < ITERATIONS; ++i) {
+ assertEquals(5, in.read(buffer));
+ System.out.println("> " + new String(buffer));
+ }
+ int ret = in.read(buffer);
+ assertEquals(-1, ret);
+ in.close();
+ }
+
@Override
public void tearDown() throws Exception {
super.tearDown();
@@ -90,5 +121,7 @@ public class HdfsAppendTest extends CamelTestSupport {
Path dir = new Path("hdfs://localhost:9000/tmp/test");
FileSystem fs = FileSystem.get(dir.toUri(), conf);
fs.delete(dir, true);
+ dir = new Path("hdfs://localhost:9000/tmp/test-dynamic");
+ fs.delete(dir, true);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4028e2b3/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
index 5c04bba..c59efc2 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/integration/HdfsProducerConsumerIntegrationTest.java
@@ -16,15 +16,28 @@
*/
package org.apache.camel.component.hdfs2.integration;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockComponent;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
@@ -73,6 +86,51 @@ public class HdfsProducerConsumerIntegrationTest extends CamelTestSupport {
assertThat(sent.isEmpty(), is(true));
}
+ @Test
+ // see https://issues.apache.org/jira/browse/CAMEL-7318
+ public void testMultipleConsumers() throws Exception {
+ int ITERATIONS = 400;
+
+ Path p = new Path("hdfs://localhost:9000/tmp/test/multiple-consumers");
+ FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
+ fs.mkdirs(p);
+ for (int i = 1; i <= ITERATIONS; i++) {
+ FSDataOutputStream os = fs.create(new Path(p, String.format("file-%03d.txt", i)));
+ os.write(String.format("hello (%03d)\n", i).getBytes());
+ os.close();
+ }
+
+ final Set<String> fileNames = new HashSet<String>();
+ final CountDownLatch latch = new CountDownLatch(ITERATIONS);
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ fileNames.add(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+ latch.countDown();
+ }
+ });
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ // difference in chunkSize only to allow multiple consumers
+ from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=128").to("mock:result");
+ from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=256").to("mock:result");
+ from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=512").to("mock:result");
+ from("hdfs2://localhost:9000/tmp/test/multiple-consumers?pattern=*.txt&fileSystemType=HDFS&chunkSize=1024").to("mock:result");
+ }
+ });
+ context.start();
+
+ resultEndpoint.expectedMessageCount(ITERATIONS);
+
+ latch.await(30, TimeUnit.SECONDS);
+
+ resultEndpoint.assertIsSatisfied();
+ assertThat(fileNames.size(), equalTo(ITERATIONS));
+ }
+
@Override
@After
public void tearDown() throws Exception {
@@ -83,6 +141,7 @@ public class HdfsProducerConsumerIntegrationTest extends CamelTestSupport {
Path dir = new Path("hdfs://localhost:9000/tmp/test");
FileSystem fs = FileSystem.get(dir.toUri(), conf);
fs.delete(dir, true);
+ fs.delete(new Path("hdfs://localhost:9000/tmp/test/multiple-consumers"), true);
}
}
\ No newline at end of file