You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/10/17 18:44:20 UTC

git commit: CAMEL-6028 added CamelFileName Expression support to camel-hdfs producer

Updated Branches:
  refs/heads/master 23d9d028f -> 4c3d1526c


CAMEL-6028 added CamelFileName Expression support to camel-hdfs producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4c3d1526
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4c3d1526
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4c3d1526

Branch: refs/heads/master
Commit: 4c3d1526c9d285e138a16b9fd275171f4255ec65
Parents: 23d9d02
Author: boday <bo...@apache.org>
Authored: Thu Oct 17 09:41:05 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Thu Oct 17 09:41:05 2013 -0700

----------------------------------------------------------------------
 .../camel/component/hdfs/HdfsProducer.java      | 21 ++++++++++++++--
 .../camel/component/hdfs/HdfsProducerTest.java  | 25 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4c3d1526/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 11a73e0..8ac00eb 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.security.auth.login.Configuration;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.IOHelper;
 
@@ -185,8 +186,7 @@ public class HdfsProducer extends DefaultProducer {
             if (ostream != null) {
                 IOHelper.close(ostream, "output stream", log);
             }
-            StringBuilder actualPath = new StringBuilder(hdfsPath);
-            actualPath.append(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
+            StringBuilder actualPath = getHdfsPathUsingFileNameHeader(exchange);
             ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
         } else if (ostream == null) {
             // must have ostream
@@ -235,6 +235,23 @@ public class HdfsProducer extends DefaultProducer {
         log.debug("Wrote body to hdfs-file {}", path);
     }
 
+    /**
+     * helper method to construct the hdfsPath from the CamelFileName String or Expression
+     * @param exchange
+     * @return
+     */
+    private StringBuilder getHdfsPathUsingFileNameHeader(Exchange exchange) {
+        StringBuilder actualPath = new StringBuilder(hdfsPath);
+        String fileName = "";
+        Object value = exchange.getIn().getHeader(Exchange.FILE_NAME);
+        if (value instanceof String) {
+            fileName = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
+        } else if (value instanceof Expression) {
+            fileName =  ((Expression) value).evaluate(exchange, String.class);
+        }
+        return actualPath.append(fileName);
+    }
+
     private StringBuilder newFileName() {
         StringBuilder actualPath = new StringBuilder(hdfsPath);
         actualPath.append(splitNum);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c3d1526/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index 937958b..a219190 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.camel.language.simple.SimpleLanguage.simple;
+
 public class HdfsProducerTest extends HdfsTestSupport {
 
     private static final Path TEMP_DIR = new Path(new File("target/test/").getAbsolutePath());
@@ -357,6 +359,29 @@ public class HdfsProducerTest extends HdfsTestSupport {
         }
     }
 
+    @Test
+    public void testWriteTextWithDynamicFilenameExpression() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        for (int i = 0; i < 5; i++) {
+            template.sendBodyAndHeader("direct:write_dynamic_filename", "CIAO" + i, Exchange.FILE_NAME, simple("file-${body}"));
+        }
+
+        for (int i = 0; i < 5; i++) {
+            InputStream in = null;
+            try {
+                in = new URL("file:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/file-CIAO" + i).openStream();
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                IOUtils.copyBytes(in, bos, 4096, false);
+                assertEquals("CIAO" + i, new String(bos.toByteArray()));
+            } finally {
+                IOHelper.close(in);
+            }
+        }
+    }
+
     @Override
     public void tearDown() throws Exception {
         if (!canTest()) {