You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/08/10 09:45:39 UTC
svn commit: r802679 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/
main/java/org/apache/camel/processor/
test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Mon Aug 10 07:45:38 2009
New Revision: 802679
URL: http://svn.apache.org/viewvc?rev=802679&view=rev
Log:
CAMEL-1895: poll enrich from a file based endpoint is currently not supported if the starting endpoint is also file based.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
- copied, changed from r802661, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=802679&r1=802678&r2=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Aug 10 07:45:38 2009
@@ -192,7 +192,7 @@
// register on completion callback that does the completiom stategies
// (for instance to move the file after we have processed it)
- exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations));
+ exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target));
// process the exchange
getProcessor().process(exchange);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=802679&r1=802678&r2=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java Mon Aug 10 07:45:38 2009
@@ -37,10 +37,12 @@
private GenericFileEndpoint<T> endpoint;
private GenericFileOperations<T> operations;
private ExceptionHandler exceptionHandler;
+ private GenericFile<T> file;
- public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
+ public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations, GenericFile<T> file) {
this.endpoint = endpoint;
this.operations = operations;
+ this.file = file;
}
@SuppressWarnings("unchecked")
@@ -67,10 +69,6 @@
protected void onCompletion(Exchange exchange) {
GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
- // after processing
- final GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
- boolean failed = exchange.isFailed();
-
if (log.isDebugEnabled()) {
log.debug("Done processing file: " + file + " using exchange: " + exchange);
}
@@ -78,6 +76,7 @@
// commit or rollback
boolean committed = false;
try {
+ boolean failed = exchange.isFailed();
if (!failed) {
// commit the file strategy if there was no failure or already handled by the DeadLetterChannel
processStrategyCommit(processStrategy, exchange, file);
@@ -116,7 +115,7 @@
try {
if (log.isTraceEnabled()) {
- log.trace("Committing remote file strategy: " + processStrategy + " for file: " + file);
+ log.trace("Commit file strategy: " + processStrategy + " for file: " + file);
}
processStrategy.commit(operations, endpoint, exchange, file);
} catch (Exception e) {
@@ -134,11 +133,8 @@
protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy,
Exchange exchange, GenericFile<T> file) {
- // only WARN in case we do not handle it ourself by moving failed files
- if (endpoint.getMoveFailed() == null) {
- if (log.isWarnEnabled()) {
- log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file);
- }
+ if (log.isWarnEnabled()) {
+ log.warn("Rollback file strategy: " + processStrategy + " for file: " + file);
}
try {
processStrategy.rollback(operations, endpoint, exchange, file);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=802679&r1=802678&r2=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Mon Aug 10 07:45:38 2009
@@ -20,7 +20,7 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
@@ -111,6 +111,8 @@
* @param exchange input data.
*/
public void process(Exchange exchange) throws Exception {
+ preChceckPoll(exchange);
+
Exchange resourceExchange;
if (timeout < 0) {
if (LOG.isDebugEnabled()) {
@@ -156,6 +158,28 @@
}
/**
+ * Strategy to pre check polling.
+ * <p/>
+ * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
+ * started from a file based endpoint as that is not currently supported.
+ *
+ * @param exchange the current exchange
+ */
+ protected void preChceckPoll(Exchange exchange) throws Exception {
+ // cannot poll a file endpoint if already consuming from a file endpoint (CAMEL-1895)
+ if (consumer instanceof EventDrivenPollingConsumer) {
+ EventDrivenPollingConsumer edpc = (EventDrivenPollingConsumer) consumer;
+ boolean fileBasedConsumer = edpc.getEndpoint().getEndpointKey().startsWith("file") || edpc.getEndpoint().getEndpointKey().startsWith("ftp");
+ boolean fileBasedExchange = exchange.getFromEndpoint().getEndpointUri().startsWith("file") || exchange.getFromEndpoint().getEndpointUri().startsWith("ftp");
+ if (fileBasedConsumer && fileBasedExchange) {
+ throw new IllegalArgumentException("Camel durrently does not support pollEnrich from a file/ftp endpoint"
+ + " when the route also started from a file/ftp endpoint."
+ + " Started from: " + exchange.getFromEndpoint().getEndpointUri() + " pollEnrich: " + edpc.getEndpoint().getEndpointUri());
+ }
+ }
+ }
+
+ /**
* Creates a new {@link org.apache.camel.impl.DefaultExchange} instance from the given
* <code>exchange</code>. The resulting exchange's pattern is defined by
* <code>pattern</code>.
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java (from r802661, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java&r1=802661&r2=802679&rev=802679&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileUsingProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java Mon Aug 10 07:45:38 2009
@@ -16,19 +16,15 @@
*/
package org.apache.camel.component.file;
-import org.apache.camel.CamelExchangeException;
-import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.util.FileUtil;
/**
* @version $Revision$
*/
-public class FileConsumePollEnrichFileUsingProcessorTest extends ContextTestSupport {
+public class FileConsumePollEnrichFileTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
@@ -37,7 +33,9 @@
super.setUp();
}
- public void testPollEnrich() throws Exception {
+
+ // TODO: CAMEL-1895
+ public void xxxTestPollEnrich() throws Exception {
getMockEndpoint("mock:start").expectedBodiesReceived("Start");
MockEndpoint mock = getMockEndpoint("mock:result");
@@ -45,50 +43,28 @@
mock.expectedFileExists("target/enrich/.done/AAA.fin");
mock.expectedFileExists("target/enrichdata/.done/AAA.dat");
- mock.expectedFileExists("target/enrichdata/BBB.dat");
template.sendBodyAndHeader("file://target/enrichdata", "Big file", Exchange.FILE_NAME, "AAA.dat");
- template.sendBodyAndHeader("file://target/enrichdata", "Other Big file", Exchange.FILE_NAME, "BBB.dat");
template.sendBodyAndHeader("file://target/enrich", "Start", Exchange.FILE_NAME, "AAA.fin");
assertMockEndpointsSatisfied();
}
+ public void testNothing() {
+ //
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("file://target/enrich?move=.done")
- .process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- String name = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, String.class);
- name = FileUtil.stripExt(name) + ".dat";
-
- // use a consumer template to get the data file
- Exchange data = null;
- ConsumerTemplate con = exchange.getContext().createConsumerTemplate();
- try {
- // try to get the data file
- data = con.receive("file://target/enrichdata?move=.done&fileName=" + name, 5000);
- } finally {
- // stop the consumer as it does not need to poll for files anymore
- con.stop();
- }
-
- // if we found the data file then process it by sending it to the direct:data endpoint
- if (data != null) {
- template.send("direct:data", data);
- } else {
- // otherwise do a rollback
- throw new CamelExchangeException("Cannot find the data file " + name, exchange);
- }
- }
- }).to("mock:start");
-
- from("direct:data")
+ from("file://target/enrich?move=.done&readLock=none")
+ .to("mock:start")
+ .pollEnrich("file://target/enrichdata?move=.done&readLock=none")
.to("mock:result");
}
};
}
-}
+
+}
\ No newline at end of file