You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/02/01 12:28:00 UTC
svn commit: r1441392 - in /camel/trunk/components/camel-hdfs/src:
main/java/org/apache/camel/component/hdfs/HdfsConstants.java
main/java/org/apache/camel/component/hdfs/HdfsProducer.java
test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
Author: davsclaus
Date: Fri Feb 1 11:28:00 2013
New Revision: 1441392
URL: http://svn.apache.org/viewvc?rev=1441392&view=rev
Log:
CAMEL-5971: Added header to control if hdfs producer should close stream after writing.
Modified:
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java?rev=1441392&r1=1441391&r2=1441392&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java Fri Feb 1 11:28:00 2013
@@ -44,6 +44,8 @@ public final class HdfsConstants {
public static final int DEFAULT_CHECK_IDLE_INTERVAL = 500;
+ public static final String HDFS_CLOSE = "CamelHdfsClose";
+
private HdfsConstants() {
}
}
Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1441392&r1=1441391&r2=1441392&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Fri Feb 1 11:28:00 2013
@@ -187,9 +187,18 @@ public class HdfsProducer extends Defaul
idle.set(false);
+ // close if we do not have idle checker task to do this for us
+ boolean close = scheduler == null;
+ // but user may have a header to explict control the close
+ Boolean closeHeader = exchange.getIn().getHeader(HdfsConstants.HDFS_CLOSE, Boolean.class);
+ if (closeHeader != null) {
+ close = closeHeader;
+ }
+
// if no idle checker then we need to explicit close the stream after usage
- if (scheduler == null) {
+ if (close) {
try {
+ HdfsProducer.this.log.trace("Closing stream");
ostream.close();
ostream = null;
} catch (IOException e) {
Modified: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java?rev=1441392&r1=1441391&r2=1441392&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java (original)
+++ camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java Fri Feb 1 11:28:00 2013
@@ -40,7 +40,7 @@ import org.junit.Before;
import org.junit.Test;
public class HdfsProducerTest extends CamelTestSupport {
-
+
private static final Path TEMP_DIR = new Path(new File("target/test/").getAbsolutePath());
//Hadoop doesn't run on IBM JDK
@@ -53,6 +53,7 @@ public class HdfsProducerTest extends Ca
}
super.setUp();
}
+
@Test
public void testProducer() throws Exception {
if (SKIP) {
@@ -71,6 +72,31 @@ public class HdfsProducerTest extends Ca
}
@Test
+ public void testProducerClose() throws Exception {
+ if (SKIP) {
+ return;
+ }
+ for (int i = 0; i < 10; ++i) {
+ // send 10 messages, and mark to close in last message
+ template.sendBodyAndHeader("direct:start1", "PAPPO" + i, HdfsConstants.HDFS_CLOSE, i == 9 ? true : false);
+ }
+
+ Configuration conf = new Configuration();
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
+ FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ int i = 0;
+ while (reader.next(key, value)) {
+ Text txt = (Text) value;
+ assertEquals("PAPPO" + i, txt.toString());
+ ++i;
+ }
+ }
+
+ @Test
public void testWriteBoolean() throws Exception {
if (SKIP) {
return;