You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/07/09 12:16:26 UTC

svn commit: r1359022 - in /camel/branches/camel-2.10.x: ./ components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/ components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/ components/camel-hdfs/src/test/resources/

Author: davsclaus
Date: Mon Jul  9 10:16:26 2012
New Revision: 1359022

URL: http://svn.apache.org/viewvc?rev=1359022&view=rev
Log:
CAMEL-5433: Added option connectOnStartup to camel-hdfs. Polished code as well.

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
    svn:mergeinfo = /camel/trunk:1359013

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Jul  9 10:16:26 2012
@@ -1 +1 @@
-/camel/trunk:1-1358964
+/camel/trunk:1-1358964,1359013

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java Mon Jul  9 10:16:26 2012
@@ -50,6 +50,7 @@ public class HdfsConfiguration {
     private int chunkSize = HdfsConstants.DEFAULT_BUFFERSIZE;
     private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
     private List<HdfsProducer.SplitStrategy> splitStrategies;
+    private boolean connectOnStartup = true;
 
     public HdfsConfiguration() {
     }
@@ -182,6 +183,9 @@ public class HdfsConfiguration {
             throw new IllegalArgumentException("Unrecognized Cache protocol: " + protocol + " for uri: " + uri);
         }
         hostName = uri.getHost();
+        if (hostName == null) {
+            hostName = "localhost";
+        }
         port = uri.getPort() == -1 ? HdfsConstants.DEFAULT_PORT : uri.getPort();
         path = uri.getPath();
         Map<String, Object> hdfsSettings = URISupport.parseParameters(uri);
@@ -389,4 +393,12 @@ public class HdfsConfiguration {
     public void setSplitStrategy(String splitStrategy) {
         // noop
     }
+
+    public boolean isConnectOnStartup() {
+        return connectOnStartup;
+    }
+
+    public void setConnectOnStartup(boolean connectOnStartup) {
+        this.connectOnStartup = connectOnStartup;
+    }
 }

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java Mon Jul  9 10:16:26 2012
@@ -17,16 +17,15 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.util.IOHelper;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -37,23 +36,56 @@ public final class HdfsConsumer extends 
     private final HdfsConfiguration config;
     private final StringBuilder hdfsPath;
     private final Processor processor;
-    private AtomicBoolean idle = new AtomicBoolean(false);
     private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
-    private HdfsInputStream istream;
+    private volatile HdfsInputStream istream;
 
-    public HdfsConsumer(DefaultEndpoint endpoint, Processor processor, HdfsConfiguration config) {
+    public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
         super(endpoint, processor);
         this.config = config;
         this.hdfsPath = config.getFileSystemType().getHdfsPath(config);
         this.processor = processor;
+
+        setInitialDelay(config.getInitialDelay());
+        setDelay(config.getDelay());
+        setUseFixedDelay(true);
+    }
+
+    @Override
+    public HdfsEndpoint getEndpoint() {
+        return (HdfsEndpoint) super.getEndpoint();
     }
 
     @Override
     protected void doStart() throws Exception {
-        super.setInitialDelay(config.getInitialDelay());
-        super.setDelay(config.getDelay());
-        super.setUseFixedDelay(false);
         super.doStart();
+
+        if (config.isConnectOnStartup()) {
+            // setup hdfs if configured to do on startup
+            setupHdfs(true);
+        }
+    }
+
+    private HdfsInfo setupHdfs(boolean onStartup) throws Exception {
+        // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
+        if (onStartup) {
+            log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+            }
+        }
+
+        // hadoop will cache the connection by default so its faster to get in the poll method
+        HdfsInfo answer = new HdfsInfo(this.hdfsPath.toString());
+
+        if (onStartup) {
+            log.info("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+            }
+        }
+        return answer;
     }
 
     @Override
@@ -66,7 +98,7 @@ public final class HdfsConsumer extends 
 
         int numMessages = 0;
 
-        HdfsInfo info = new HdfsInfo(this.hdfsPath.toString());
+        HdfsInfo info = setupHdfs(false);
         FileStatus fileStatuses[];
         if (info.getFileSystem().isFile(info.getPath())) {
             fileStatuses = info.getFileSystem().globStatus(info.getPath());
@@ -75,10 +107,6 @@ public final class HdfsConsumer extends 
             fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
         }
 
-        if (fileStatuses.length > 0) {
-            this.idle.set(false);
-        }
-
         for (int i = 0; i < fileStatuses.length; ++i) {
             FileStatus status = fileStatuses[i];
             if (normalFileIsDirectoryNoSuccessFile(status, info)) {
@@ -91,24 +119,39 @@ public final class HdfsConsumer extends 
                 this.rwlock.writeLock().unlock();
             }
 
-            Holder<Object> key = new Holder<Object>();
-            Holder<Object> value = new Holder<Object>();
-            while (this.istream.next(key, value) != 0) {
-                Exchange exchange = this.getEndpoint().createExchange();
-                Message message = new DefaultMessage();
-                message.setHeader(Exchange.FILE_NAME, StringUtils
-                        .substringAfterLast(status.getPath().toString(), "/"));
-                if (key.value != null) {
-                    message.setHeader(HdfsHeader.KEY.name(), key.value);
+            try {
+                Holder<Object> key = new Holder<Object>();
+                Holder<Object> value = new Holder<Object>();
+                while (this.istream.next(key, value) != 0) {
+                    Exchange exchange = this.getEndpoint().createExchange();
+                    Message message = new DefaultMessage();
+                    String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/");
+                    message.setHeader(Exchange.FILE_NAME, fileName);
+                    if (key.value != null) {
+                        message.setHeader(HdfsHeader.KEY.name(), key.value);
+                    }
+                    message.setBody(value.value);
+                    exchange.setIn(message);
+
+                    log.debug("Processing file {}", fileName);
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        exchange.setException(e);
+                    }
+
+                    // in case of unhandled exceptions then let the exception handler handle them
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException(exchange.getException());
+                    }
+
+                    numMessages++;
                 }
-                message.setBody(value.value);
-                exchange.setIn(message);
-                this.processor.process(exchange);
-                numMessages++;
+            } finally {
+                IOHelper.close(istream, "input stream", log);
             }
-            this.istream.close();
         }
-        this.idle.set(true);
+
         return numMessages;
     }
 
@@ -122,17 +165,4 @@ public final class HdfsConsumer extends 
         return false;
     }
 
-    public HdfsInputStream getIstream() {
-        try {
-            rwlock.readLock().lock();
-            return istream;
-        } finally {
-            rwlock.readLock().unlock();
-        }
-    }
-
-    public AtomicBoolean isIdle() {
-        return idle;
-    }
-
 }

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java Mon Jul  9 10:16:26 2012
@@ -31,6 +31,8 @@ public class HdfsInfo {
 
     public HdfsInfo(String hdfsPath) throws IOException {
         this.conf = new Configuration();
+        // this will connect to the hadoop hdfs file system, and in case of no connection
+        // then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
         this.fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
         this.path = new Path(hdfsPath);
     }

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java Mon Jul  9 10:16:26 2012
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 
-public class HdfsInputStream {
+public class HdfsInputStream implements Closeable {
 
     private HdfsFileType fileType;
     private String actualPath;
@@ -50,6 +50,7 @@ public class HdfsInputStream {
         return ret;
     }
 
+    @Override
     public final void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(in);

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java Mon Jul  9 10:16:26 2012
@@ -26,13 +26,13 @@ import org.apache.camel.TypeConverter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 
-public class HdfsOutputStream {
+public class HdfsOutputStream implements Closeable {
 
     private HdfsFileType fileType;
     private String actualPath;
     private String suffixedPath;
     private Closeable out;
-    private boolean opened;
+    private volatile boolean opened;
     private final AtomicLong numOfWrittenBytes = new AtomicLong(0L);
     private final AtomicLong numOfWrittenMessages = new AtomicLong(0L);
     private final AtomicLong lastAccess = new AtomicLong(Long.MAX_VALUE);
@@ -69,6 +69,7 @@ public class HdfsOutputStream {
         return ret;
     }
 
+    @Override
     public void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(out);

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Mon Jul  9 10:16:26 2012
@@ -24,14 +24,15 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
 
 public class HdfsProducer extends DefaultProducer {
 
     private final HdfsConfiguration config;
     private final StringBuilder hdfsPath;
     private final AtomicBoolean idle = new AtomicBoolean(false);
-    private ScheduledExecutorService scheduler;
-    private HdfsOutputStream ostream;
+    private volatile ScheduledExecutorService scheduler;
+    private volatile HdfsOutputStream ostream;
     private long splitNum;
 
     public static final class SplitStrategy {
@@ -84,13 +85,18 @@ public class HdfsProducer extends Defaul
     }
 
     @Override
+    public HdfsEndpoint getEndpoint() {
+        return (HdfsEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         super.doStart();
-        StringBuilder actualPath = new StringBuilder(hdfsPath);
-        if (config.getSplitStrategies().size() > 0) {
-            actualPath = newFileName();
+
+        // setup hdfs if configured to do on startup
+        if (getEndpoint().getConfig().isConnectOnStartup()) {
+            ostream = setupHdfs(true);
         }
-        ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
 
         SplitStrategy idleStrategy = null;
         for (SplitStrategy strategy : config.getSplitStrategies()) {
@@ -106,6 +112,38 @@ public class HdfsProducer extends Defaul
         }
     }
 
+    private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws Exception {
+        if (ostream != null) {
+            return ostream;
+        }
+
+        StringBuilder actualPath = new StringBuilder(hdfsPath);
+        if (config.getSplitStrategies().size() > 0) {
+            actualPath = newFileName();
+        }
+
+        // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
+        if (onStartup) {
+            log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+            }
+        }
+
+        HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+
+        if (onStartup) {
+            log.info("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+            }
+        }
+
+        return answer;
+    }
+
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -113,7 +151,10 @@ public class HdfsProducer extends Defaul
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduler);
             scheduler = null;
         }
-        ostream.close();
+        if (ostream != null) {
+            IOHelper.close(ostream, "output stream", log);
+            ostream = null;
+        }
     }
 
     @Override
@@ -121,6 +162,11 @@ public class HdfsProducer extends Defaul
         Object body = exchange.getIn().getBody();
         Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
 
+        // must have ostream
+        if (ostream == null) {
+            ostream = setupHdfs(false);
+        }
+
         boolean split = false;
         List<SplitStrategy> strategies = config.getSplitStrategies();
         for (SplitStrategy splitStrategy : strategies) {
@@ -128,18 +174,17 @@ public class HdfsProducer extends Defaul
         }
 
         if (split) {
-            ostream.close();
+            if (ostream != null) {
+                IOHelper.close(ostream, "output stream", log);
+            }
             StringBuilder actualPath = newFileName();
             ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
         }
+
         ostream.append(key, body, exchange.getContext().getTypeConverter());
         idle.set(false);
     }
 
-    public HdfsOutputStream getOstream() {
-        return ostream;
-    }
-
     private StringBuilder newFileName() {
         StringBuilder actualPath = new StringBuilder(hdfsPath);
         actualPath.append(splitNum);
@@ -147,10 +192,6 @@ public class HdfsProducer extends Defaul
         return actualPath;
     }
 
-    public final AtomicBoolean isIdle() {
-        return idle;
-    }
-
     /**
      * Idle check background task
      */
@@ -164,6 +205,11 @@ public class HdfsProducer extends Defaul
 
         @Override
         public void run() {
+            // only run if ostream has been created
+            if (ostream == null) {
+                return;
+            }
+
             HdfsProducer.this.log.trace("IdleCheck running");
 
             if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value && !idle.get() && !ostream.isBusy().get()) {

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java Mon Jul  9 10:16:26 2012
@@ -50,13 +50,11 @@ public class HdfsConsumerTest extends Ca
     //Hadoop doesn't run on IBM JDK
     private static final boolean SKIP = System.getProperty("java.vendor").contains("IBM");
 
-
     @Override
     public boolean isUseRouteBuilder() {
         return false;
     }
 
-
     @Before
     public void setUp() throws Exception {
         if (SKIP) {
@@ -82,15 +80,16 @@ public class HdfsConsumerTest extends Ca
         }
         out.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(2);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
             }
         });
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-
-        resultEndpoint.expectedMessageCount(2);
         context.start();
+
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -141,19 +140,17 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.message(0).body(byte.class).isEqualTo(3);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
             }
         });
         context.start();
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
 
-        resultEndpoint.expectedMessageCount(1);
-        List<Exchange> exchanges = resultEndpoint.getReceivedExchanges();
-        for (Exchange exchange : exchanges) {
-            Assert.assertTrue(exchange.getIn(Byte.class) == value);
-        }
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -175,6 +172,9 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "??fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -182,8 +182,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -204,6 +202,9 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "??fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -211,8 +212,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -234,6 +233,9 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -241,8 +243,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -264,6 +264,9 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -271,8 +274,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -294,6 +295,9 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -301,8 +305,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -324,6 +326,9 @@ public class HdfsConsumerTest extends Ca
         writer.sync();
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -331,8 +336,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -356,6 +359,9 @@ public class HdfsConsumerTest extends Ca
         writer.append(valueWritable);
         writer.close();
 
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("hdfs:///" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0").to("mock:result");
@@ -363,8 +369,6 @@ public class HdfsConsumerTest extends Ca
         });
         context.start();
 
-        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
         resultEndpoint.assertIsSatisfied();
     }
 

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties Mon Jul  9 10:16:26 2012
@@ -23,6 +23,7 @@ log4j.rootLogger=INFO, file
 # uncomment the following line to turn on Camel debugging
 #log4j.logger.org.apache.camel=DEBUG
 #log4j.logger.org.apache.camel.component.hdfs=TRACE
+#log4j.logger.org.apache.hadoop.ipc=TRACE
 
 
 # CONSOLE appender not used by default