You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/07/09 12:16:26 UTC
svn commit: r1359022 - in /camel/branches/camel-2.10.x: ./
components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/
components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/
components/camel-hdfs/src/test/resources/
Author: davsclaus
Date: Mon Jul 9 10:16:26 2012
New Revision: 1359022
URL: http://svn.apache.org/viewvc?rev=1359022&view=rev
Log:
CAMEL-5433: Added option connectOnStartup to camel-hdfs. Polished code as well.
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
svn:mergeinfo = /camel/trunk:1359013
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Jul 9 10:16:26 2012
@@ -1 +1 @@
-/camel/trunk:1-1358964
+/camel/trunk:1-1358964,1359013
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java Mon Jul 9 10:16:26 2012
@@ -50,6 +50,7 @@ public class HdfsConfiguration {
private int chunkSize = HdfsConstants.DEFAULT_BUFFERSIZE;
private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
private List<HdfsProducer.SplitStrategy> splitStrategies;
+ private boolean connectOnStartup = true;
public HdfsConfiguration() {
}
@@ -182,6 +183,9 @@ public class HdfsConfiguration {
throw new IllegalArgumentException("Unrecognized Cache protocol: " + protocol + " for uri: " + uri);
}
hostName = uri.getHost();
+ if (hostName == null) {
+ hostName = "localhost";
+ }
port = uri.getPort() == -1 ? HdfsConstants.DEFAULT_PORT : uri.getPort();
path = uri.getPath();
Map<String, Object> hdfsSettings = URISupport.parseParameters(uri);
@@ -389,4 +393,12 @@ public class HdfsConfiguration {
public void setSplitStrategy(String splitStrategy) {
// noop
}
+
+ public boolean isConnectOnStartup() {
+ return connectOnStartup;
+ }
+
+ public void setConnectOnStartup(boolean connectOnStartup) {
+ this.connectOnStartup = connectOnStartup;
+ }
}
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java Mon Jul 9 10:16:26 2012
@@ -17,16 +17,15 @@
package org.apache.camel.component.hdfs;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.util.IOHelper;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -37,23 +36,56 @@ public final class HdfsConsumer extends
private final HdfsConfiguration config;
private final StringBuilder hdfsPath;
private final Processor processor;
- private AtomicBoolean idle = new AtomicBoolean(false);
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
- private HdfsInputStream istream;
+ private volatile HdfsInputStream istream;
- public HdfsConsumer(DefaultEndpoint endpoint, Processor processor, HdfsConfiguration config) {
+ public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
super(endpoint, processor);
this.config = config;
this.hdfsPath = config.getFileSystemType().getHdfsPath(config);
this.processor = processor;
+
+ setInitialDelay(config.getInitialDelay());
+ setDelay(config.getDelay());
+ setUseFixedDelay(true);
+ }
+
+ @Override
+ public HdfsEndpoint getEndpoint() {
+ return (HdfsEndpoint) super.getEndpoint();
}
@Override
protected void doStart() throws Exception {
- super.setInitialDelay(config.getInitialDelay());
- super.setDelay(config.getDelay());
- super.setUseFixedDelay(false);
super.doStart();
+
+ if (config.isConnectOnStartup()) {
+ // setup hdfs if configured to do on startup
+ setupHdfs(true);
+ }
+ }
+
+ private HdfsInfo setupHdfs(boolean onStartup) throws Exception {
+ // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
+ if (onStartup) {
+ log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+ }
+ }
+
+ // hadoop will cache the connection by default so its faster to get in the poll method
+ HdfsInfo answer = new HdfsInfo(this.hdfsPath.toString());
+
+ if (onStartup) {
+ log.info("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), hdfsPath.toString()});
+ }
+ }
+ return answer;
}
@Override
@@ -66,7 +98,7 @@ public final class HdfsConsumer extends
int numMessages = 0;
- HdfsInfo info = new HdfsInfo(this.hdfsPath.toString());
+ HdfsInfo info = setupHdfs(false);
FileStatus fileStatuses[];
if (info.getFileSystem().isFile(info.getPath())) {
fileStatuses = info.getFileSystem().globStatus(info.getPath());
@@ -75,10 +107,6 @@ public final class HdfsConsumer extends
fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
}
- if (fileStatuses.length > 0) {
- this.idle.set(false);
- }
-
for (int i = 0; i < fileStatuses.length; ++i) {
FileStatus status = fileStatuses[i];
if (normalFileIsDirectoryNoSuccessFile(status, info)) {
@@ -91,24 +119,39 @@ public final class HdfsConsumer extends
this.rwlock.writeLock().unlock();
}
- Holder<Object> key = new Holder<Object>();
- Holder<Object> value = new Holder<Object>();
- while (this.istream.next(key, value) != 0) {
- Exchange exchange = this.getEndpoint().createExchange();
- Message message = new DefaultMessage();
- message.setHeader(Exchange.FILE_NAME, StringUtils
- .substringAfterLast(status.getPath().toString(), "/"));
- if (key.value != null) {
- message.setHeader(HdfsHeader.KEY.name(), key.value);
+ try {
+ Holder<Object> key = new Holder<Object>();
+ Holder<Object> value = new Holder<Object>();
+ while (this.istream.next(key, value) != 0) {
+ Exchange exchange = this.getEndpoint().createExchange();
+ Message message = new DefaultMessage();
+ String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/");
+ message.setHeader(Exchange.FILE_NAME, fileName);
+ if (key.value != null) {
+ message.setHeader(HdfsHeader.KEY.name(), key.value);
+ }
+ message.setBody(value.value);
+ exchange.setIn(message);
+
+ log.debug("Processing file {}", fileName);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ // in case of unhandled exceptions then let the exception handler handle them
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException(exchange.getException());
+ }
+
+ numMessages++;
}
- message.setBody(value.value);
- exchange.setIn(message);
- this.processor.process(exchange);
- numMessages++;
+ } finally {
+ IOHelper.close(istream, "input stream", log);
}
- this.istream.close();
}
- this.idle.set(true);
+
return numMessages;
}
@@ -122,17 +165,4 @@ public final class HdfsConsumer extends
return false;
}
- public HdfsInputStream getIstream() {
- try {
- rwlock.readLock().lock();
- return istream;
- } finally {
- rwlock.readLock().unlock();
- }
- }
-
- public AtomicBoolean isIdle() {
- return idle;
- }
-
}
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java Mon Jul 9 10:16:26 2012
@@ -31,6 +31,8 @@ public class HdfsInfo {
public HdfsInfo(String hdfsPath) throws IOException {
this.conf = new Configuration();
+ // this will connect to the hadoop hdfs file system, and in case of no connection
+ // then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
this.fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
this.path = new Path(hdfsPath);
}
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java Mon Jul 9 10:16:26 2012
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-public class HdfsInputStream {
+public class HdfsInputStream implements Closeable {
private HdfsFileType fileType;
private String actualPath;
@@ -50,6 +50,7 @@ public class HdfsInputStream {
return ret;
}
+ @Override
public final void close() throws IOException {
if (opened) {
IOUtils.closeStream(in);
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java Mon Jul 9 10:16:26 2012
@@ -26,13 +26,13 @@ import org.apache.camel.TypeConverter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-public class HdfsOutputStream {
+public class HdfsOutputStream implements Closeable {
private HdfsFileType fileType;
private String actualPath;
private String suffixedPath;
private Closeable out;
- private boolean opened;
+ private volatile boolean opened;
private final AtomicLong numOfWrittenBytes = new AtomicLong(0L);
private final AtomicLong numOfWrittenMessages = new AtomicLong(0L);
private final AtomicLong lastAccess = new AtomicLong(Long.MAX_VALUE);
@@ -69,6 +69,7 @@ public class HdfsOutputStream {
return ret;
}
+ @Override
public void close() throws IOException {
if (opened) {
IOUtils.closeStream(out);
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java Mon Jul 9 10:16:26 2012
@@ -24,14 +24,15 @@ import java.util.concurrent.atomic.Atomi
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
public class HdfsProducer extends DefaultProducer {
private final HdfsConfiguration config;
private final StringBuilder hdfsPath;
private final AtomicBoolean idle = new AtomicBoolean(false);
- private ScheduledExecutorService scheduler;
- private HdfsOutputStream ostream;
+ private volatile ScheduledExecutorService scheduler;
+ private volatile HdfsOutputStream ostream;
private long splitNum;
public static final class SplitStrategy {
@@ -84,13 +85,18 @@ public class HdfsProducer extends Defaul
}
@Override
+ public HdfsEndpoint getEndpoint() {
+ return (HdfsEndpoint) super.getEndpoint();
+ }
+
+ @Override
protected void doStart() throws Exception {
super.doStart();
- StringBuilder actualPath = new StringBuilder(hdfsPath);
- if (config.getSplitStrategies().size() > 0) {
- actualPath = newFileName();
+
+ // setup hdfs if configured to do on startup
+ if (getEndpoint().getConfig().isConnectOnStartup()) {
+ ostream = setupHdfs(true);
}
- ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
SplitStrategy idleStrategy = null;
for (SplitStrategy strategy : config.getSplitStrategies()) {
@@ -106,6 +112,38 @@ public class HdfsProducer extends Defaul
}
}
+ private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws Exception {
+ if (ostream != null) {
+ return ostream;
+ }
+
+ StringBuilder actualPath = new StringBuilder(hdfsPath);
+ if (config.getSplitStrategies().size() > 0) {
+ actualPath = newFileName();
+ }
+
+ // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
+ if (onStartup) {
+ log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+ }
+ }
+
+ HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+
+ if (onStartup) {
+ log.info("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Connected to hdfs file-system {}:{}/{}", new Object[]{config.getHostName(), config.getPort(), actualPath.toString()});
+ }
+ }
+
+ return answer;
+ }
+
@Override
protected void doStop() throws Exception {
super.doStop();
@@ -113,7 +151,10 @@ public class HdfsProducer extends Defaul
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduler);
scheduler = null;
}
- ostream.close();
+ if (ostream != null) {
+ IOHelper.close(ostream, "output stream", log);
+ ostream = null;
+ }
}
@Override
@@ -121,6 +162,11 @@ public class HdfsProducer extends Defaul
Object body = exchange.getIn().getBody();
Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
+ // must have ostream
+ if (ostream == null) {
+ ostream = setupHdfs(false);
+ }
+
boolean split = false;
List<SplitStrategy> strategies = config.getSplitStrategies();
for (SplitStrategy splitStrategy : strategies) {
@@ -128,18 +174,17 @@ public class HdfsProducer extends Defaul
}
if (split) {
- ostream.close();
+ if (ostream != null) {
+ IOHelper.close(ostream, "output stream", log);
+ }
StringBuilder actualPath = newFileName();
ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
}
+
ostream.append(key, body, exchange.getContext().getTypeConverter());
idle.set(false);
}
- public HdfsOutputStream getOstream() {
- return ostream;
- }
-
private StringBuilder newFileName() {
StringBuilder actualPath = new StringBuilder(hdfsPath);
actualPath.append(splitNum);
@@ -147,10 +192,6 @@ public class HdfsProducer extends Defaul
return actualPath;
}
- public final AtomicBoolean isIdle() {
- return idle;
- }
-
/**
* Idle check background task
*/
@@ -164,6 +205,11 @@ public class HdfsProducer extends Defaul
@Override
public void run() {
+ // only run if ostream has been created
+ if (ostream == null) {
+ return;
+ }
+
HdfsProducer.this.log.trace("IdleCheck running");
if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value && !idle.get() && !ostream.isBusy().get()) {
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java Mon Jul 9 10:16:26 2012
@@ -50,13 +50,11 @@ public class HdfsConsumerTest extends Ca
//Hadoop doesn't run on IBM JDK
private static final boolean SKIP = System.getProperty("java.vendor").contains("IBM");
-
@Override
public boolean isUseRouteBuilder() {
return false;
}
-
@Before
public void setUp() throws Exception {
if (SKIP) {
@@ -82,15 +80,16 @@ public class HdfsConsumerTest extends Ca
}
out.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(2);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
}
});
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
-
- resultEndpoint.expectedMessageCount(2);
context.start();
+
resultEndpoint.assertIsSatisfied();
}
@@ -141,19 +140,17 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.message(0).body(byte.class).isEqualTo(3);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
}
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
- List<Exchange> exchanges = resultEndpoint.getReceivedExchanges();
- for (Exchange exchange : exchanges) {
- Assert.assertTrue(exchange.getIn(Byte.class) == value);
- }
resultEndpoint.assertIsSatisfied();
}
@@ -175,6 +172,9 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "??fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -182,8 +182,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
@@ -204,6 +202,9 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "??fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -211,8 +212,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
@@ -234,6 +233,9 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -241,8 +243,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
@@ -264,6 +264,9 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -271,8 +274,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
@@ -294,6 +295,9 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -301,8 +305,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
@@ -324,6 +326,9 @@ public class HdfsConsumerTest extends Ca
writer.sync();
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0").to("mock:result");
@@ -331,8 +336,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
@@ -356,6 +359,9 @@ public class HdfsConsumerTest extends Ca
writer.append(valueWritable);
writer.close();
+ MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+
context.addRoutes(new RouteBuilder() {
public void configure() {
from("hdfs:///" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0").to("mock:result");
@@ -363,8 +369,6 @@ public class HdfsConsumerTest extends Ca
});
context.start();
- MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedMessageCount(1);
resultEndpoint.assertIsSatisfied();
}
Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties?rev=1359022&r1=1359021&r2=1359022&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties (original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/resources/log4j.properties Mon Jul 9 10:16:26 2012
@@ -23,6 +23,7 @@ log4j.rootLogger=INFO, file
# uncomment the following line to turn on Camel debugging
#log4j.logger.org.apache.camel=DEBUG
#log4j.logger.org.apache.camel.component.hdfs=TRACE
+#log4j.logger.org.apache.hadoop.ipc=TRACE
# CONSOLE appender not used by default