You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/10/17 07:55:35 UTC

git commit: CAMEL-6028 add CamelFileName support to camel-hdfs producer

Updated Branches:
  refs/heads/master ce0091e22 -> de4cb2b98


CAMEL-6028 add CamelFileName support to camel-hdfs producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/de4cb2b9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/de4cb2b9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/de4cb2b9

Branch: refs/heads/master
Commit: de4cb2b98259fb7c9777e8c0ca79f841956b9994
Parents: ce0091e
Author: boday <bo...@apache.org>
Authored: Wed Oct 16 22:50:54 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Wed Oct 16 22:50:54 2013 -0700

----------------------------------------------------------------------
 .../camel/component/hdfs/HdfsProducer.java      | 12 ++-
 .../camel/component/hdfs/HdfsProducerTest.java  | 86 +++++++++++++-------
 2 files changed, 68 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/de4cb2b9/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 2c3c0f5..11a73e0 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -180,8 +180,16 @@ public class HdfsProducer extends DefaultProducer {
         Object body = exchange.getIn().getBody();
         Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
 
-        // must have ostream
-        if (ostream == null) {
+        // if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath
+        if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) {
+            if (ostream != null) {
+                IOHelper.close(ostream, "output stream", log);
+            }
+            StringBuilder actualPath = new StringBuilder(hdfsPath);
+            actualPath.append(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+            ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+        } else if (ostream == null) {
+            // must have ostream
             ostream = setupHdfs(false);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/de4cb2b9/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index 09f67a2..937958b 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.component.hdfs;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +33,7 @@ import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -59,7 +64,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:start1", "PAPPO");
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -81,7 +86,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         }
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -106,7 +111,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_boolean", aBoolean);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-boolean");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-boolean");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -127,7 +132,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_byte", aByte);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-byte");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-byte");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -148,7 +153,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_int", anInt);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-int");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-int");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -169,7 +174,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_float", aFloat);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-float");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-float");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -190,7 +195,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_double", aDouble);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-double");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-double");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -211,7 +216,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_long", aLong);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-long");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-long");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -232,7 +237,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_text1", txt);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text1");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -254,7 +259,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBodyAndHeader("direct:write_text2", txtValue, "KEY", txtKey);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text2");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text2");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
@@ -276,9 +281,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text3");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
-        MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text3", conf);
+        MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text3", conf);
         Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
         Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
         reader.next(key, value);
@@ -297,9 +302,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBody("direct:write_text4", txtValue);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text4");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text4");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
-        ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text4", conf);
+        ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text4", conf);
         Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
         reader.next(value);
         assertEquals(value.toString(), txtValue);
@@ -317,9 +322,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
         template.sendBodyAndHeader("direct:write_text5", txtValue, "KEY", txtKey);
 
         Configuration conf = new Configuration();
-        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text5");
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text5");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
-        BloomMapFile.Reader reader = new BloomMapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text5", conf);
+        BloomMapFile.Reader reader = new BloomMapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text5", conf);
         Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
         Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
         reader.next(key, value);
@@ -329,6 +334,29 @@ public class HdfsProducerTest extends HdfsTestSupport {
         IOHelper.close(reader);
     }
 
+    @Test
+    public void testWriteTextWithDynamicFilename() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        for (int i = 0; i < 5; i++) {
+            template.sendBodyAndHeader("direct:write_dynamic_filename", "CIAO" + i, Exchange.FILE_NAME, "file" + i);
+        }
+
+        for (int i = 0; i < 5; i++) {
+            InputStream in = null;
+            try {
+                in = new URL("file:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/file" + i).openStream();
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                IOUtils.copyBytes(in, bos, 4096, false);
+                assertEquals("CIAO" + i, new String(bos.toByteArray()));
+            } finally {
+                IOHelper.close(in);
+            }
+        }
+    }
+
     @Override
     public void tearDown() throws Exception {
         if (!canTest()) {
@@ -349,33 +377,35 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
             @Override
             public void configure() throws Exception {
-                from("direct:start1").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
+                from("direct:start1").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
 
                 /* For testing writables */
-                from("direct:write_boolean").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-boolean?fileSystemType=LOCAL&valueType=BOOLEAN&fileType=SEQUENCE_FILE");
+                from("direct:write_boolean").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-boolean?fileSystemType=LOCAL&valueType=BOOLEAN&fileType=SEQUENCE_FILE");
 
-                from("direct:write_byte").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE");
+                from("direct:write_byte").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE");
 
-                from("direct:write_int").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE");
+                from("direct:write_int").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE");
 
-                from("direct:write_float").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-float?fileSystemType=LOCAL&valueType=FLOAT&fileType=SEQUENCE_FILE");
+                from("direct:write_float").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-float?fileSystemType=LOCAL&valueType=FLOAT&fileType=SEQUENCE_FILE");
 
-                from("direct:write_long").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-long?fileSystemType=LOCAL&valueType=LONG&fileType=SEQUENCE_FILE");
+                from("direct:write_long").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-long?fileSystemType=LOCAL&valueType=LONG&fileType=SEQUENCE_FILE");
 
-                from("direct:write_double").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-double?fileSystemType=LOCAL&valueType=DOUBLE&fileType=SEQUENCE_FILE");
+                from("direct:write_double").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-double?fileSystemType=LOCAL&valueType=DOUBLE&fileType=SEQUENCE_FILE");
 
-                from("direct:write_text1").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
+                from("direct:write_text1").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE");
 
                 /* For testing key and value writing */
-                from("direct:write_text2").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text2?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=SEQUENCE_FILE");
+                from("direct:write_text2").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text2?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=SEQUENCE_FILE");
 
-                from("direct:write_text3").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text3?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=MAP_FILE");
+                from("direct:write_text3").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text3?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=MAP_FILE");
 
                 /* For testing ArrayFile */
-                from("direct:write_text4").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text4?fileSystemType=LOCAL&valueType=TEXT&fileType=ARRAY_FILE");
+                from("direct:write_text4").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text4?fileSystemType=LOCAL&valueType=TEXT&fileType=ARRAY_FILE");
 
                 /* For testing BloomMapFile */
-                from("direct:write_text5").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text5?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=BLOOMMAP_FILE");
+                from("direct:write_text5").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text5?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=BLOOMMAP_FILE");
+
+                from("direct:write_dynamic_filename").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/?fileSystemType=LOCAL&valueType=TEXT");
             }
         };
     }