You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/02/01 12:28:00 UTC

svn commit: r1441392 - in /camel/trunk/components/camel-hdfs/src: main/java/org/apache/camel/component/hdfs/HdfsConstants.java main/java/org/apache/camel/component/hdfs/HdfsProducer.java test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java

Author: davsclaus
Date: Fri Feb  1 11:28:00 2013
New Revision: 1441392

URL: http://svn.apache.org/viewvc?rev=1441392&view=rev
Log:
CAMEL-5971: Added header to control if hdfs producer should close stream after writing.

Modified:
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
    camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java?rev=1441392&r1=1441391&r2=1441392&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java Fri Feb  1 11:28:00 2013
@@ -44,6 +44,8 @@ public final class HdfsConstants {
 
     public static final int DEFAULT_CHECK_IDLE_INTERVAL = 500;
 
+    public static final String HDFS_CLOSE = "CamelHdfsClose";
+
     private HdfsConstants() {
     }
 }

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1441392&r1=1441391&r2=1441392&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Fri Feb  1 11:28:00 2013
@@ -187,9 +187,18 @@ public class HdfsProducer extends Defaul
 
         idle.set(false);
 
+        // close if we do not have idle checker task to do this for us
+        boolean close = scheduler == null;
+        // but user may have a header to explict control the close
+        Boolean closeHeader = exchange.getIn().getHeader(HdfsConstants.HDFS_CLOSE, Boolean.class);
+        if (closeHeader != null) {
+            close = closeHeader;
+        }
+
         // if no idle checker then we need to explicit close the stream after usage
-        if (scheduler == null) {
+        if (close) {
             try {
+                HdfsProducer.this.log.trace("Closing stream");
                 ostream.close();
                 ostream = null;
             } catch (IOException e) {

Modified: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java?rev=1441392&r1=1441391&r2=1441392&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java (original)
+++ camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java Fri Feb  1 11:28:00 2013
@@ -40,7 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class HdfsProducerTest extends CamelTestSupport {
-    
+
     private static final Path TEMP_DIR = new Path(new File("target/test/").getAbsolutePath());
 
     //Hadoop doesn't run on IBM JDK
@@ -53,6 +53,7 @@ public class HdfsProducerTest extends Ca
         }
         super.setUp();
     }
+
     @Test
     public void testProducer() throws Exception {
         if (SKIP) {
@@ -71,6 +72,31 @@ public class HdfsProducerTest extends Ca
     }
 
     @Test
+    public void testProducerClose() throws Exception {
+        if (SKIP) {
+            return;
+        }
+        for (int i = 0; i < 10; ++i) {
+            // send 10 messages, and mark to close in last message
+            template.sendBodyAndHeader("direct:start1", "PAPPO" + i, HdfsConstants.HDFS_CLOSE, i == 9 ? true : false);
+        }
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
+        FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+        int i = 0;
+        while (reader.next(key, value)) {
+            Text txt = (Text) value;
+            assertEquals("PAPPO" + i, txt.toString());
+            ++i;
+        }
+    }
+
+    @Test
     public void testWriteBoolean() throws Exception {
         if (SKIP) {
             return;