You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/10/19 00:51:01 UTC
git commit: CAMEL-6867 reworked the camel-hdfs split mode filenames
to use UUID generator
Updated Branches:
refs/heads/master 170c411e2 -> cd82ef8b3
CAMEL-6867 reworked the camel-hdfs split mode filenames to use UUID generator
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd82ef8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd82ef8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd82ef8b
Branch: refs/heads/master
Commit: cd82ef8b32f90cda1fff6fa118abf902e18371d1
Parents: 170c411
Author: boday <bo...@apache.org>
Authored: Fri Oct 18 14:25:07 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Fri Oct 18 14:25:52 2013 -0700
----------------------------------------------------------------------
.../component/hdfs/HdfsFileSystemType.java | 2 -
.../camel/component/hdfs/HdfsProducer.java | 5 +--
.../hdfs/HdfsProducerConsumerTest.java | 11 +++---
.../component/hdfs/HdfsProducerSplitTest.java | 41 ++++++++------------
4 files changed, 25 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
index 97dcff2..891c4eb 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
@@ -26,7 +26,6 @@ public enum HdfsFileSystemType {
hpath.append(config.getPath());
if (config.getSplitStrategies().size() > 0) {
hpath.append('/');
- hpath.append(HdfsConstants.DEFAULT_SEGMENT_PREFIX);
}
return hpath;
}
@@ -43,7 +42,6 @@ public enum HdfsFileSystemType {
hpath.append(config.getPath());
if (config.getSplitStrategies().size() > 0) {
hpath.append('/');
- hpath.append(HdfsConstants.DEFAULT_SEGMENT_PREFIX);
}
return hpath;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 8ac00eb..4f29e1b 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StringHelper;
public class HdfsProducer extends DefaultProducer {
@@ -36,7 +37,6 @@ public class HdfsProducer extends DefaultProducer {
private final AtomicBoolean idle = new AtomicBoolean(false);
private volatile ScheduledExecutorService scheduler;
private volatile HdfsOutputStream ostream;
- private long splitNum;
public static final class SplitStrategy {
private SplitStrategyType type;
@@ -254,8 +254,7 @@ public class HdfsProducer extends DefaultProducer {
private StringBuilder newFileName() {
StringBuilder actualPath = new StringBuilder(hdfsPath);
- actualPath.append(splitNum);
- splitNum++;
+ actualPath.append(StringHelper.sanitize(getEndpoint().getCamelContext().getUuidGenerator().generateUuid()));
return actualPath;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
index b169073..4f93257 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.hdfs;
import java.io.File;
+import java.util.ArrayList;
import java.util.List;
import org.apache.camel.Exchange;
@@ -55,13 +56,15 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
@Override
public void configure() {
from("direct:start").to("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&splitStrategy=BYTES:5,IDLE:1000");
- from("hdfs:///" + file.toUri() + "?pattern=seg*&initialDelay=2000&fileSystemType=LOCAL&chunkSize=5").to("mock:result");
+ from("hdfs:///" + file.toUri() + "?initialDelay=2000&fileSystemType=LOCAL&chunkSize=5").to("mock:result");
}
});
context.start();
+ List<String> expectedResults = new ArrayList<String>();
for (int i = 0; i < 10; ++i) {
template.sendBody("direct:start", "CIAO" + i);
+ expectedResults.add("CIAO" + i);
}
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
@@ -69,11 +72,9 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
resultEndpoint.expectedMessageCount(10);
resultEndpoint.assertIsSatisfied();
- int i = 0;
List<Exchange> exchanges = resultEndpoint.getExchanges();
- for (Exchange exchange : exchanges) {
- assertEquals("CIAO" + i++, exchange.getIn().getBody(String.class));
- }
+ assertEquals(10, exchanges.size());
+ resultEndpoint.expectedBodiesReceivedInAnyOrder(expectedResults);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
index 44bb590..7496a11 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
@@ -16,16 +16,15 @@
*/
package org.apache.camel.component.hdfs;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
import java.io.File;
-import java.io.InputStream;
-import java.net.URL;
+import java.io.InputStreamReader;
import org.apache.camel.builder.RouteBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
@@ -65,16 +64,13 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
// stop Camel to flush and close file stream
stopCamelContext();
- for (int i = 0; i < 3; ++i) {
- InputStream in = null;
- try {
- in = new URL("file:///" + BASE_FILE.toUri() + "3/" + HdfsConstants.DEFAULT_SEGMENT_PREFIX + i).openStream();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- IOUtils.copyBytes(in, bos, 4096, false);
- assertEquals("CIAO" + i, new String(bos.toByteArray()));
- } finally {
- IOUtils.closeStream(in);
- }
+ FileSystem fs = FileSystem.get(new Configuration());
+ FileStatus[] status = fs.listStatus(new Path("file:///" + BASE_FILE.toUri() + "3"));
+ assertEquals(3, status.length);
+ for (int i = 0; i < 3; i++) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
+ assertTrue(br.readLine().startsWith("CIAO"));
+ assertNull(br.readLine());
}
}
@@ -98,16 +94,13 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
}
stopCamelContext();
- for (int i = 0; i < 10; ++i) {
- InputStream in = null;
- try {
- in = new URL("file:///" + BASE_FILE.toUri() + routeNr + '/' + HdfsConstants.DEFAULT_SEGMENT_PREFIX + i).openStream();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- IOUtils.copyBytes(in, bos, 4096, false);
- assertEquals("CIAO" + i, new String(bos.toByteArray()));
- } finally {
- IOUtils.closeStream(in);
- }
+ FileSystem fs = FileSystem.get(new Configuration());
+ FileStatus[] status = fs.listStatus(new Path("file:///" + BASE_FILE.toUri() + routeNr));
+ assertEquals(10, status.length);
+ for (int i = 0; i < status.length; i++) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
+ assertTrue(br.readLine().startsWith("CIAO"));
+ assertNull(br.readLine());
}
}