You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/03/03 14:27:56 UTC
[3/3] camel git commit: [CAMEL-8430] Fix "readSuffix" usage in
camel-hdfs
[CAMEL-8430] Fix "readSuffix" usage in camel-hdfs
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/109d8ecb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/109d8ecb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/109d8ecb
Branch: refs/heads/master
Commit: 109d8ecb40db0455c6471fd197fcf1583a386ea4
Parents: f86bbd0
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Tue Mar 3 14:26:48 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Tue Mar 3 14:27:20 2015 +0100
----------------------------------------------------------------------
.../camel/component/hdfs/HdfsInputStream.java | 4 +-
.../camel/component/hdfs/HdfsConsumerTest.java | 54 ++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/109d8ecb/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 7e9d2c2..ac1bff9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -28,6 +28,7 @@ public class HdfsInputStream implements Closeable {
private HdfsFileType fileType;
private String actualPath;
private String suffixedPath;
+ private String suffixedReadPath;
private Closeable in;
private boolean opened;
private int chunkSize;
@@ -42,6 +43,7 @@ public class HdfsInputStream implements Closeable {
ret.fileType = configuration.getFileType();
ret.actualPath = hdfsPath;
ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
+ ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
ret.chunkSize = configuration.getChunkSize();
HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
@@ -55,7 +57,7 @@ public class HdfsInputStream implements Closeable {
if (opened) {
IOUtils.closeStream(in);
HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
- info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX));
+ info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
opened = false;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/109d8ecb/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 33a8f6d..368b1d8 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -17,9 +17,19 @@
package org.apache.camel.component.hdfs;
import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -41,6 +51,7 @@ import org.junit.Test;
import static org.apache.hadoop.io.SequenceFile.CompressionType;
import static org.apache.hadoop.io.SequenceFile.createWriter;
+import static org.hamcrest.CoreMatchers.equalTo;
public class HdfsConsumerTest extends HdfsTestSupport {
@@ -96,6 +107,49 @@ public class HdfsConsumerTest extends HdfsTestSupport {
}
@Test
+ public void testReadWithReadSuffix() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
+ Configuration conf = new Configuration();
+ FileSystem fs1 = FileSystem.get(file.toUri(), conf);
+ SequenceFile.Writer writer = createWriter(fs1, conf, file, NullWritable.class, BooleanWritable.class);
+ NullWritable keyWritable = NullWritable.get();
+ BooleanWritable valueWritable = new BooleanWritable();
+ valueWritable.set(true);
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ writer.close();
+
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("hdfs:///" + file.getParent().toUri() + "?consumerProperties=#cprops&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled").to("mock:result");
+ }
+ });
+ Map<String, Object> props = new HashMap<String, Object>();
+ ScheduledExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(null, "unitTestPool", 1);
+ DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(pool);
+ props.put("scheduler", scheduler);
+ ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("cprops", props);
+ context.start();
+
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.assertIsSatisfied();
+
+ // synchronize on pool that was used to run hdfs consumer thread
+ scheduler.getScheduledExecutorService().shutdown();
+ scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS);
+
+ Set<String> files = new HashSet<String>(Arrays.asList(new File("target/test").list()));
+ assertThat(files.size(), equalTo(2));
+ assertTrue(files.remove("test-camel-boolean.handled"));
+ assertTrue(files.remove(".test-camel-boolean.handled.crc"));
+ }
+
+ @Test
public void testReadBoolean() throws Exception {
if (!canTest()) {
return;