You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/10/17 07:55:35 UTC
git commit: CAMEL-6028 add CamelFileName support to camel-hdfs
producer
Updated Branches:
refs/heads/master ce0091e22 -> de4cb2b98
CAMEL-6028 add CamelFileName support to camel-hdfs producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/de4cb2b9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/de4cb2b9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/de4cb2b9
Branch: refs/heads/master
Commit: de4cb2b98259fb7c9777e8c0ca79f841956b9994
Parents: ce0091e
Author: boday <bo...@apache.org>
Authored: Wed Oct 16 22:50:54 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Wed Oct 16 22:50:54 2013 -0700
----------------------------------------------------------------------
.../camel/component/hdfs/HdfsProducer.java | 12 ++-
.../camel/component/hdfs/HdfsProducerTest.java | 86 +++++++++++++-------
2 files changed, 68 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/de4cb2b9/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 2c3c0f5..11a73e0 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -180,8 +180,16 @@ public class HdfsProducer extends DefaultProducer {
Object body = exchange.getIn().getBody();
Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
- // must have ostream
- if (ostream == null) {
+ // if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath
+ if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) {
+ if (ostream != null) {
+ IOHelper.close(ostream, "output stream", log);
+ }
+ StringBuilder actualPath = new StringBuilder(hdfsPath);
+ actualPath.append(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+ ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+ } else if (ostream == null) {
+ // must have ostream
ostream = setupHdfs(false);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/de4cb2b9/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index 09f67a2..937958b 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -16,8 +16,12 @@
*/
package org.apache.camel.component.hdfs;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.IOHelper;
import org.apache.hadoop.conf.Configuration;
@@ -29,6 +33,7 @@ import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
@@ -59,7 +64,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:start1", "PAPPO");
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel1");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -81,7 +86,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
}
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel1");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -106,7 +111,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_boolean", aBoolean);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-boolean");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-boolean");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -127,7 +132,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_byte", aByte);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-byte");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-byte");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -148,7 +153,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_int", anInt);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-int");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-int");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -169,7 +174,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_float", aFloat);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-float");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-float");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -190,7 +195,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_double", aDouble);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-double");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-double");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -211,7 +216,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_long", aLong);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-long");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-long");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -232,7 +237,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_text1", txt);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text1");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text1");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -254,7 +259,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBodyAndHeader("direct:write_text2", txtValue, "KEY", txtKey);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text2");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text2");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -276,9 +281,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text3");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
- MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text3", conf);
+ MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text3", conf);
Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.next(key, value);
@@ -297,9 +302,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBody("direct:write_text4", txtValue);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text4");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text4");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
- ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text4", conf);
+ ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text4", conf);
Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.next(value);
assertEquals(value.toString(), txtValue);
@@ -317,9 +322,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
template.sendBodyAndHeader("direct:write_text5", txtValue, "KEY", txtKey);
Configuration conf = new Configuration();
- Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text5");
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text5");
FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
- BloomMapFile.Reader reader = new BloomMapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text5", conf);
+ BloomMapFile.Reader reader = new BloomMapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text5", conf);
Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.next(key, value);
@@ -329,6 +334,29 @@ public class HdfsProducerTest extends HdfsTestSupport {
IOHelper.close(reader);
}
+ @Test
+ public void testWriteTextWithDynamicFilename() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ for (int i = 0; i < 5; i++) {
+ template.sendBodyAndHeader("direct:write_dynamic_filename", "CIAO" + i, Exchange.FILE_NAME, "file" + i);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ InputStream in = null;
+ try {
+ in = new URL("file:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/file" + i).openStream();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ IOUtils.copyBytes(in, bos, 4096, false);
+ assertEquals("CIAO" + i, new String(bos.toByteArray()));
+ } finally {
+ IOHelper.close(in);
+ }
+ }
+ }
+
@Override
public void tearDown() throws Exception {
if (!canTest()) {
@@ -349,33 +377,35 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Override
public void configure() throws Exception {
- from("direct:start1").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
+ from("direct:start1").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
/* For testing writables */
- from("direct:write_boolean").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-boolean?fileSystemType=LOCAL&valueType=BOOLEAN&fileType=SEQUENCE_FILE");
+ from("direct:write_boolean").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-boolean?fileSystemType=LOCAL&valueType=BOOLEAN&fileType=SEQUENCE_FILE");
- from("direct:write_byte").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE");
+ from("direct:write_byte").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE");
- from("direct:write_int").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE");
+ from("direct:write_int").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE");
- from("direct:write_float").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-float?fileSystemType=LOCAL&valueType=FLOAT&fileType=SEQUENCE_FILE");
+ from("direct:write_float").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-float?fileSystemType=LOCAL&valueType=FLOAT&fileType=SEQUENCE_FILE");
- from("direct:write_long").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-long?fileSystemType=LOCAL&valueType=LONG&fileType=SEQUENCE_FILE");
+ from("direct:write_long").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-long?fileSystemType=LOCAL&valueType=LONG&fileType=SEQUENCE_FILE");
- from("direct:write_double").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-double?fileSystemType=LOCAL&valueType=DOUBLE&fileType=SEQUENCE_FILE");
+ from("direct:write_double").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-double?fileSystemType=LOCAL&valueType=DOUBLE&fileType=SEQUENCE_FILE");
- from("direct:write_text1").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
+ from("direct:write_text1").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
/* For testing key and value writing */
- from("direct:write_text2").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text2?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=SEQUENCE_FILE");
+ from("direct:write_text2").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text2?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=SEQUENCE_FILE");
- from("direct:write_text3").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text3?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=MAP_FILE");
+ from("direct:write_text3").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text3?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=MAP_FILE");
/* For testing ArrayFile */
- from("direct:write_text4").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text4?fileSystemType=LOCAL&valueType=TEXT&fileType=ARRAY_FILE");
+ from("direct:write_text4").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text4?fileSystemType=LOCAL&valueType=TEXT&fileType=ARRAY_FILE");
/* For testing BloomMapFile */
- from("direct:write_text5").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text5?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=BLOOMMAP_FILE");
+ from("direct:write_text5").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text5?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=BLOOMMAP_FILE");
+
+ from("direct:write_dynamic_filename").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/?fileSystemType=LOCAL&valueType=TEXT");
}
};
}