You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2013/10/19 00:51:01 UTC

git commit: CAMEL-6867 reworked the camel-hdfs split mode filenames to use UUID generator

Updated Branches:
  refs/heads/master 170c411e2 -> cd82ef8b3


CAMEL-6867 reworked the camel-hdfs split mode filenames to use UUID generator


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd82ef8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd82ef8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd82ef8b

Branch: refs/heads/master
Commit: cd82ef8b32f90cda1fff6fa118abf902e18371d1
Parents: 170c411
Author: boday <bo...@apache.org>
Authored: Fri Oct 18 14:25:07 2013 -0700
Committer: boday <bo...@apache.org>
Committed: Fri Oct 18 14:25:52 2013 -0700

----------------------------------------------------------------------
 .../component/hdfs/HdfsFileSystemType.java      |  2 -
 .../camel/component/hdfs/HdfsProducer.java      |  5 +--
 .../hdfs/HdfsProducerConsumerTest.java          | 11 +++---
 .../component/hdfs/HdfsProducerSplitTest.java   | 41 ++++++++------------
 4 files changed, 25 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
index 97dcff2..891c4eb 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
@@ -26,7 +26,6 @@ public enum HdfsFileSystemType {
             hpath.append(config.getPath());
             if (config.getSplitStrategies().size() > 0) {
                 hpath.append('/');
-                hpath.append(HdfsConstants.DEFAULT_SEGMENT_PREFIX);
             }
             return hpath;
         }
@@ -43,7 +42,6 @@ public enum HdfsFileSystemType {
             hpath.append(config.getPath());
             if (config.getSplitStrategies().size() > 0) {
                 hpath.append('/');
-                hpath.append(HdfsConstants.DEFAULT_SEGMENT_PREFIX);
             }
             return hpath;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 8ac00eb..4f29e1b 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StringHelper;
 
 public class HdfsProducer extends DefaultProducer {
 
@@ -36,7 +37,6 @@ public class HdfsProducer extends DefaultProducer {
     private final AtomicBoolean idle = new AtomicBoolean(false);
     private volatile ScheduledExecutorService scheduler;
     private volatile HdfsOutputStream ostream;
-    private long splitNum;
 
     public static final class SplitStrategy {
         private SplitStrategyType type;
@@ -254,8 +254,7 @@ public class HdfsProducer extends DefaultProducer {
 
     private StringBuilder newFileName() {
         StringBuilder actualPath = new StringBuilder(hdfsPath);
-        actualPath.append(splitNum);
-        splitNum++;
+        actualPath.append(StringHelper.sanitize(getEndpoint().getCamelContext().getUuidGenerator().generateUuid()));
         return actualPath;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
index b169073..4f93257 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.camel.Exchange;
@@ -55,13 +56,15 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
             @Override
             public void configure() {
                 from("direct:start").to("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&splitStrategy=BYTES:5,IDLE:1000");
-                from("hdfs:///" + file.toUri() + "?pattern=seg*&initialDelay=2000&fileSystemType=LOCAL&chunkSize=5").to("mock:result");
+                from("hdfs:///" + file.toUri() + "?initialDelay=2000&fileSystemType=LOCAL&chunkSize=5").to("mock:result");
             }
         });
         context.start();
 
+        List<String> expectedResults = new ArrayList<String>();
         for (int i = 0; i < 10; ++i) {
             template.sendBody("direct:start", "CIAO" + i);
+            expectedResults.add("CIAO" + i);
         }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
@@ -69,11 +72,9 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
         resultEndpoint.expectedMessageCount(10);
         resultEndpoint.assertIsSatisfied();
 
-        int i = 0;
         List<Exchange> exchanges = resultEndpoint.getExchanges();
-        for (Exchange exchange : exchanges) {
-            assertEquals("CIAO" + i++, exchange.getIn().getBody(String.class));
-        }
+        assertEquals(10, exchanges.size());
+        resultEndpoint.expectedBodiesReceivedInAnyOrder(expectedResults);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/cd82ef8b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
index 44bb590..7496a11 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
@@ -16,16 +16,15 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
 import java.io.File;
-import java.io.InputStream;
-import java.net.URL;
+import java.io.InputStreamReader;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -65,16 +64,13 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
         // stop Camel to flush and close file stream
         stopCamelContext();
 
-        for (int i = 0; i < 3; ++i) {
-            InputStream in = null;
-            try {
-                in = new URL("file:///" + BASE_FILE.toUri() + "3/" + HdfsConstants.DEFAULT_SEGMENT_PREFIX + i).openStream();
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                IOUtils.copyBytes(in, bos, 4096, false);
-                assertEquals("CIAO" + i, new String(bos.toByteArray()));
-            } finally {
-                IOUtils.closeStream(in);
-            }
+        FileSystem fs = FileSystem.get(new Configuration());
+        FileStatus[] status = fs.listStatus(new Path("file:///" + BASE_FILE.toUri() + "3"));
+        assertEquals(3, status.length);
+        for (int i = 0; i < 3; i++) {
+            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
+            assertTrue(br.readLine().startsWith("CIAO"));
+            assertNull(br.readLine());
         }
     }
 
@@ -98,16 +94,13 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
         }
         stopCamelContext();
 
-        for (int i = 0; i < 10; ++i) {
-            InputStream in = null;
-            try {
-                in = new URL("file:///" + BASE_FILE.toUri() + routeNr + '/' + HdfsConstants.DEFAULT_SEGMENT_PREFIX + i).openStream();
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                IOUtils.copyBytes(in, bos, 4096, false);
-                assertEquals("CIAO" + i, new String(bos.toByteArray()));
-            } finally {
-                IOUtils.closeStream(in);
-            }
+        FileSystem fs = FileSystem.get(new Configuration());
+        FileStatus[] status = fs.listStatus(new Path("file:///" + BASE_FILE.toUri() + routeNr));
+        assertEquals(10, status.length);
+        for (int i = 0; i < status.length; i++) {
+            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
+            assertTrue(br.readLine().startsWith("CIAO"));
+            assertNull(br.readLine());
         }
     }