You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2015/03/03 14:27:54 UTC

[1/3] camel git commit: [CAMEL-8430] Fix "readSuffix" usage in camel-hdfs2

Repository: camel
Updated Branches:
  refs/heads/master 2db1d8d1e -> 109d8ecb4


[CAMEL-8430] Fix "readSuffix" usage in camel-hdfs2


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f86bbd0e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f86bbd0e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f86bbd0e

Branch: refs/heads/master
Commit: f86bbd0e2ca5763deb4fb6122ad018786b3a7f03
Parents: 8a78e53
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Tue Mar 3 14:22:54 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Tue Mar 3 14:27:19 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hdfs2/HdfsInputStream.java  |  4 +-
 .../camel/component/hdfs2/HdfsConsumerTest.java | 53 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f86bbd0e/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
index 951e51e..8e761e1 100644
--- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
+++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java
@@ -28,6 +28,7 @@ public class HdfsInputStream implements Closeable {
     private HdfsFileType fileType;
     private String actualPath;
     private String suffixedPath;
+    private String suffixedReadPath;
     private Closeable in;
     private boolean opened;
     private int chunkSize;
@@ -42,6 +43,7 @@ public class HdfsInputStream implements Closeable {
         ret.fileType = configuration.getFileType();
         ret.actualPath = hdfsPath;
         ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
+        ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
         ret.chunkSize = configuration.getChunkSize();
         HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
         info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
@@ -55,7 +57,7 @@ public class HdfsInputStream implements Closeable {
         if (opened) {
             IOUtils.closeStream(in);
             HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
-            info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX));
+            info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
             opened = false;
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f86bbd0e/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
index 8588030..48c155f 100644
--- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
+++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java
@@ -18,9 +18,19 @@ package org.apache.camel.component.hdfs2;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +52,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.hadoop.io.SequenceFile.CompressionType;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 public class HdfsConsumerTest extends HdfsTestSupport {
 
@@ -97,6 +108,48 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     }
 
     @Test
+    public void testReadWithReadSuffix() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
+        Configuration conf = new Configuration();
+        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+        NullWritable keyWritable = NullWritable.get();
+        BooleanWritable valueWritable = new BooleanWritable();
+        valueWritable.set(true);
+        writer.append(keyWritable, valueWritable);
+        writer.sync();
+        writer.close();
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs2:///" + file.getParent().toUri() + "?consumerProperties=#cprops&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled").to("mock:result");
+            }
+        });
+        Map<String, Object> props = new HashMap<String, Object>();
+        ScheduledExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(null, "unitTestPool", 1);
+        DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(pool);
+        props.put("scheduler", scheduler);
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("cprops", props);
+        context.start();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.assertIsSatisfied();
+
+        // synchronize on pool that was used to run hdfs consumer thread
+        scheduler.getScheduledExecutorService().shutdown();
+        scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS);
+
+        Set<String> files = new HashSet<String>(Arrays.asList(new File("target/test").list()));
+        assertThat(files.size(), equalTo(2));
+        assertTrue(files.remove("test-camel-boolean.handled"));
+        assertTrue(files.remove(".test-camel-boolean.handled.crc"));
+    }
+
+    @Test
     public void testReadBoolean() throws Exception {
         if (!canTest()) {
             return;


[2/3] camel git commit: [CAMEL-8428] Move getEndpoint() methods to EndpointAware interface

Posted by gg...@apache.org.
[CAMEL-8428] Move getEndpoint() methods to EndpointAware interface


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8a78e531
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8a78e531
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8a78e531

Branch: refs/heads/master
Commit: 8a78e5317b03e51c20d3de76afc99f01c3e68ebb
Parents: 2db1d8d
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Tue Mar 3 10:26:43 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Tue Mar 3 14:27:19 2015 +0100

----------------------------------------------------------------------
 camel-core/src/main/java/org/apache/camel/Consumer.java   |  9 +--------
 .../src/main/java/org/apache/camel/EndpointAware.java     |  4 +++-
 camel-core/src/main/java/org/apache/camel/Producer.java   |  7 -------
 camel-core/src/main/java/org/apache/camel/Route.java      |  7 -------
 .../main/java/org/apache/camel/impl/DefaultConsumer.java  |  2 +-
 .../src/main/java/org/apache/camel/spi/RouteContext.java  | 10 ++--------
 6 files changed, 7 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8a78e531/camel-core/src/main/java/org/apache/camel/Consumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Consumer.java b/camel-core/src/main/java/org/apache/camel/Consumer.java
index ee47729..3c4719b 100644
--- a/camel-core/src/main/java/org/apache/camel/Consumer.java
+++ b/camel-core/src/main/java/org/apache/camel/Consumer.java
@@ -21,12 +21,5 @@ package org.apache.camel;
  *
  * @version 
  */
-public interface Consumer extends Service {
-
-    /**
-     * Gets the endpoint this {@link Consumer} consumes from.
-     *
-     * @return the endpoint
-     */
-    Endpoint getEndpoint();
+public interface Consumer extends Service, EndpointAware {
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8a78e531/camel-core/src/main/java/org/apache/camel/EndpointAware.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/EndpointAware.java b/camel-core/src/main/java/org/apache/camel/EndpointAware.java
index 2d46fb1..4780aaf 100644
--- a/camel-core/src/main/java/org/apache/camel/EndpointAware.java
+++ b/camel-core/src/main/java/org/apache/camel/EndpointAware.java
@@ -22,7 +22,9 @@ package org.apache.camel;
 public interface EndpointAware {
 
     /**
-     * Gets the endpoint
+     * Gets the endpoint associated with an object.
+     * It's the endpoint for sending to for components like {@link org.apache.camel.Producer}
+     * or for consuming from for components like {@link org.apache.camel.Consumer} or {@link org.apache.camel.Route}
      *
      * @return the endpoint
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/8a78e531/camel-core/src/main/java/org/apache/camel/Producer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Producer.java b/camel-core/src/main/java/org/apache/camel/Producer.java
index 9599a38..2b890bc 100644
--- a/camel-core/src/main/java/org/apache/camel/Producer.java
+++ b/camel-core/src/main/java/org/apache/camel/Producer.java
@@ -25,13 +25,6 @@ package org.apache.camel;
 public interface Producer extends Processor, Service, IsSingleton, EndpointAware {
 
     /**
-     * Gets the endpoint this producer sends to.
-     *
-     * @return the endpoint
-     */
-    Endpoint getEndpoint();
-
-    /**
      * Creates a new exchange to send to this endpoint
      * 
      * @return a newly created exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/8a78e531/camel-core/src/main/java/org/apache/camel/Route.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Route.java b/camel-core/src/main/java/org/apache/camel/Route.java
index f9a8683..e53c42c 100644
--- a/camel-core/src/main/java/org/apache/camel/Route.java
+++ b/camel-core/src/main/java/org/apache/camel/Route.java
@@ -45,13 +45,6 @@ public interface Route extends EndpointAware {
     String getId();
 
     /**
-     * Gets the inbound endpoint
-     *
-     * @return the inbound endpoint
-     */
-    Endpoint getEndpoint();
-
-    /**
      * Gets the inbound {@link Consumer}
      *
      * @return the inbound consumer

http://git-wip-us.apache.org/repos/asf/camel/blob/8a78e531/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
index a0b55be..919766e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware, EndpointAware {
+public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware {
     protected final Logger log = LoggerFactory.getLogger(getClass());
     private final Endpoint endpoint;
     private final Processor processor;

http://git-wip-us.apache.org/repos/asf/camel/blob/8a78e531/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java b/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
index f9c615b..54e6ad0 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointAware;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeConfiguration;
 import org.apache.camel.model.FromDefinition;
@@ -32,14 +33,7 @@ import org.apache.camel.model.RouteDefinition;
  *
  * @version 
  */
-public interface RouteContext extends RuntimeConfiguration {
-
-    /**
-     * Gets the endpoint
-     *
-     * @return the endpoint
-     */
-    Endpoint getEndpoint();
+public interface RouteContext extends RuntimeConfiguration, EndpointAware {
 
     /**
      * Gets the from type


[3/3] camel git commit: [CAMEL-8430] Fix "readSuffix" usage in camel-hdfs

Posted by gg...@apache.org.
[CAMEL-8430] Fix "readSuffix" usage in camel-hdfs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/109d8ecb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/109d8ecb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/109d8ecb

Branch: refs/heads/master
Commit: 109d8ecb40db0455c6471fd197fcf1583a386ea4
Parents: f86bbd0
Author: Grzegorz Grzybek <gr...@gmail.com>
Authored: Tue Mar 3 14:26:48 2015 +0100
Committer: Grzegorz Grzybek <gr...@gmail.com>
Committed: Tue Mar 3 14:27:20 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hdfs/HdfsInputStream.java   |  4 +-
 .../camel/component/hdfs/HdfsConsumerTest.java  | 54 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/109d8ecb/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 7e9d2c2..ac1bff9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -28,6 +28,7 @@ public class HdfsInputStream implements Closeable {
     private HdfsFileType fileType;
     private String actualPath;
     private String suffixedPath;
+    private String suffixedReadPath;
     private Closeable in;
     private boolean opened;
     private int chunkSize;
@@ -42,6 +43,7 @@ public class HdfsInputStream implements Closeable {
         ret.fileType = configuration.getFileType();
         ret.actualPath = hdfsPath;
         ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
+        ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
         ret.chunkSize = configuration.getChunkSize();
         HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
         info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
@@ -55,7 +57,7 @@ public class HdfsInputStream implements Closeable {
         if (opened) {
             IOUtils.closeStream(in);
             HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
-            info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX));
+            info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
             opened = false;
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/109d8ecb/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 33a8f6d..368b1d8 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -17,9 +17,19 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,6 +51,7 @@ import org.junit.Test;
 
 import static org.apache.hadoop.io.SequenceFile.CompressionType;
 import static org.apache.hadoop.io.SequenceFile.createWriter;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 public class HdfsConsumerTest extends HdfsTestSupport {
 
@@ -96,6 +107,49 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     }
 
     @Test
+    public void testReadWithReadSuffix() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
+        Configuration conf = new Configuration();
+        FileSystem fs1 = FileSystem.get(file.toUri(), conf);
+        SequenceFile.Writer writer = createWriter(fs1, conf, file, NullWritable.class, BooleanWritable.class);
+        NullWritable keyWritable = NullWritable.get();
+        BooleanWritable valueWritable = new BooleanWritable();
+        valueWritable.set(true);
+        writer.append(keyWritable, valueWritable);
+        writer.sync();
+        writer.close();
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs:///" + file.getParent().toUri() + "?consumerProperties=#cprops&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled").to("mock:result");
+            }
+        });
+        Map<String, Object> props = new HashMap<String, Object>();
+        ScheduledExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(null, "unitTestPool", 1);
+        DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(pool);
+        props.put("scheduler", scheduler);
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("cprops", props);
+        context.start();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.assertIsSatisfied();
+
+        // synchronize on pool that was used to run hdfs consumer thread
+        scheduler.getScheduledExecutorService().shutdown();
+        scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS);
+
+        Set<String> files = new HashSet<String>(Arrays.asList(new File("target/test").list()));
+        assertThat(files.size(), equalTo(2));
+        assertTrue(files.remove("test-camel-boolean.handled"));
+        assertTrue(files.remove(".test-camel-boolean.handled.crc"));
+    }
+
+    @Test
     public void testReadBoolean() throws Exception {
         if (!canTest()) {
             return;