You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/11/29 18:29:58 UTC
svn commit: r721695 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/
test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Sat Nov 29 09:29:56 2008
New Revision: 721695
URL: http://svn.apache.org/viewvc?rev=721695&view=rev
Log:
CAMEL-1112: Refactored file consumer. Removed @deprected. Added idempotent option. Work in progress as sorting and filtering is needed.
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java (contents, props changed)
- copied, changed from r721650, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
Removed:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRouteGenerateEmptyExchangeWhenIdleTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java Sat Nov 29 09:29:56 2008
@@ -22,6 +22,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
/**
* The <a href="http://activemq.apache.org/camel/file.html">File Component</a>
@@ -42,6 +43,16 @@
*/
public static final String HEADER_FILE_NAME_PRODUCED = "org.apache.camel.file.name.produced";
+ /**
+ * Header key holding the value: current index of total in the batch being consumed
+ */
+ public static final String HEADER_FILE_BATCH_INDEX = "org.apache.camel.file.index";
+
+ /**
+ * Header key holding the value: total in the batch being consumed
+ */
+ public static final String HEADER_FILE_BATCH_TOTAL = "org.apache.camel.file.total";
+
public FileComponent() {
}
@@ -52,6 +63,14 @@
protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
File file = new File(remaining);
FileEndpoint result = new FileEndpoint(file, uri, this);
+
+ // lookup idempotent repository in registry if provided
+ String ref = getAndRemoveParameter(parameters, "idempotentRepositoryRef", String.class);
+ if (ref != null) {
+ MessageIdRepository repository = mandatoryLookup(ref, MessageIdRepository.class);
+ result.setIdempotentRepository(repository);
+ }
+
setProperties(result, parameters);
return result;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Sat Nov 29 09:29:56 2008
@@ -21,7 +21,8 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Processor;
@@ -40,17 +41,6 @@
private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
private FileEndpoint endpoint;
- private ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
- private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File, Long>();
- private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File, Long>();
-
- // the options below is @deprecated and will be removed in Camel 2.0
- private long lastPollTime;
- private int unchangedDelay;
- private boolean unchangedSize;
- private boolean generateEmptyExchangeWhenIdle;
- private boolean alwaysConsume;
-
private boolean recursive;
private String regexPattern = "";
private boolean exclusiveReadLock = true;
@@ -61,80 +51,68 @@
}
protected synchronized void poll() throws Exception {
- // should be true the first time as its the top directory
- int rc = pollFileOrDirectory(endpoint.getFile(), true);
-
- // if no files consumes and using generateEmptyExchangeWhenIdle option then process an empty exchange
- if (rc == 0 && generateEmptyExchangeWhenIdle) {
- final FileExchange exchange = endpoint.createExchange((File)null);
- getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- }
- });
+ // gather list of files to process
+ List<File> files = new ArrayList<File>();
+ filesToPoll(endpoint.getFile(), true, files);
+
+ // TODO allow reordering of files CAMEL-1112
+
+ // consume files one by one
+ int total = files.size();
+ for (int index = 0; index < files.size(); index++) {
+ File file = files.get(index);
+ processFile(file, total, index);
}
-
- lastPollTime = System.currentTimeMillis();
}
/**
- * Pools the given file or directory for files to process.
+ * Scans the given file or directory for files to process.
*
- * @param fileOrDirectory file or directory
+ * @param fileOrDirectory current file or directory when doing recursion
* @param processDir recursive
- * @return the number of files processed or being processed async.
+ * @param fileList current list of files gathered
*/
- protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
+ protected void filesToPoll(File fileOrDirectory, boolean processDir, List<File> fileList) {
+ if (fileOrDirectory == null || !fileOrDirectory.exists()) {
+ // not a file so skip it
+ return;
+ }
+
if (!fileOrDirectory.isDirectory()) {
- // process the file
- return pollFile(fileOrDirectory);
+ addFile(fileOrDirectory, fileList);
} else if (processDir) {
// directory that can be recursive
- int rc = 0;
- if (isValidFile(fileOrDirectory)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Polling directory " + fileOrDirectory);
- }
- File[] files = fileOrDirectory.listFiles();
- for (File file : files) {
- rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling directory " + fileOrDirectory);
+ }
+ File[] files = fileOrDirectory.listFiles();
+ for (File file : files) {
+ // recursive add the files
+ filesToPoll(file, isRecursive(), fileList);
}
- return rc;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping directory " + fileOrDirectory);
}
- return 0;
}
}
/**
- * Polls the given file
+ * Processes the given file
*
* @param file the file
- * @return returns 1 if the file was processed, 0 otherwise.
+ * @param total total number of files in this batch
+ * @param index current index out of total in this batch
*/
- protected int pollFile(final File file) {
+ protected void processFile(final File file, int total, int index) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Polling file: " + file);
- }
-
- if (!file.exists()) {
- return 0;
- }
- if (!isValidFile(file)) {
- return 0;
- }
- // we only care about file modified times if we are not deleting/moving files
- if (!endpoint.isNoop()) {
- if (filesBeingProcessed.contains(file)) {
- return 1;
- }
- filesBeingProcessed.put(file, file);
+ LOG.trace("Processing file: " + file);
}
final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
final FileExchange exchange = endpoint.createExchange(file);
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_TOTAL, total);
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_INDEX, index);
endpoint.configureMessage(file, exchange.getIn());
try {
@@ -173,21 +151,16 @@
if (!committed) {
processStrategyRollback(processStrategy, exchange, file);
}
- filesBeingProcessed.remove(file);
}
}
});
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " can not process file: " + file);
- }
+ LOG.warn(endpoint + " can not process file: " + file);
}
- } catch (Throwable e) {
+ } catch (Exception e) {
handleException(e);
}
-
- return 1;
}
/**
@@ -252,74 +225,33 @@
processStrategy.rollback(endpoint, exchange, file);
}
+ /**
+ * Strategy for validating if the given file should be included or not
+ * @param file the file
+ * @return true to include the file, false to skip it
+ */
protected boolean isValidFile(File file) {
- boolean result = false;
- if (file != null && file.exists()) {
- // TODO: maybe use a configurable strategy instead of the hardcoded one based on last file change
- if (isMatched(file) && (alwaysConsume || isChanged(file))) {
- result = true;
- }
- }
- return result;
- }
-
- protected boolean isChanged(File file) {
- if (file == null) {
- // Sanity check
+ // NOTE: contains will add if we had a miss
+ if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getName())) {
+ // skip as we have already processed it
return false;
- } else if (file.isDirectory()) {
- // Allow recursive polling to descend into this directory
- return true;
- } else {
- // @deprecated will be removed on Camel 2.0
- // the code below is kinda hard to maintain. We should strive to remove
- // this stuff in Camel 2.0 to keep this component simple and no surprises for end-users
- // this stuff is not persistent so restarting Camel will reset the state
- boolean lastModifiedCheck = false;
- long modifiedDuration = 0;
- if (getUnchangedDelay() > 0) {
- modifiedDuration = System.currentTimeMillis() - file.lastModified();
- lastModifiedCheck = modifiedDuration >= getUnchangedDelay();
- }
-
- long fileModified = file.lastModified();
- Long previousModified = noopMap.get(file);
- noopMap.put(file, fileModified);
- if (previousModified == null || fileModified > previousModified) {
- lastModifiedCheck = true;
- }
-
- boolean sizeCheck = false;
- long sizeDifference = 0;
- if (isUnchangedSize()) {
- Long value = fileSizes.get(file);
- if (value == null) {
- sizeCheck = true;
- } else {
- sizeCheck = file.length() != value;
- }
- }
-
- boolean answer = lastModifiedCheck || sizeCheck;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("file:" + file + " isChanged:" + answer + " " + "sizeCheck:" + sizeCheck + "("
- + sizeDifference + ") " + "lastModifiedCheck:" + lastModifiedCheck + "("
- + modifiedDuration + ")");
- }
-
- if (isUnchangedSize()) {
- if (answer) {
- fileSizes.put(file, file.length());
- } else {
- fileSizes.remove(file);
- }
- }
-
- return answer;
}
+
+ return isMatched(file);
}
+ /**
+ * Strategy to perform file matching based on endpoint configuration.
+ * <p/>
+ * Will always return false for certain files:
+ * <ul>
+ * <li>Starting with a dot</li>
+ * <li>lock files</li>
+ * </ul>
+ *
+ * @param file the file
+ * @return true if the file is matche, false if not
+ */
protected boolean isMatched(File file) {
String name = file.getName();
@@ -357,6 +289,12 @@
return true;
}
+ private void addFile(File file, List<File> fileList) {
+ if (isValidFile(file)) {
+ fileList.add(file);
+ }
+ }
+
public boolean isRecursive() {
return this.recursive;
}
@@ -373,39 +311,6 @@
this.regexPattern = regexPattern;
}
- public boolean isGenerateEmptyExchangeWhenIdle() {
- return generateEmptyExchangeWhenIdle;
- }
-
- /**
- * @deprecated will be removed in Camel 2.0
- */
- public void setGenerateEmptyExchangeWhenIdle(boolean generateEmptyExchangeWhenIdle) {
- this.generateEmptyExchangeWhenIdle = generateEmptyExchangeWhenIdle;
- }
-
- public int getUnchangedDelay() {
- return unchangedDelay;
- }
-
- /**
- * @deprecated will be removed in Camel 2.0
- */
- public void setUnchangedDelay(int unchangedDelay) {
- this.unchangedDelay = unchangedDelay;
- }
-
- public boolean isUnchangedSize() {
- return unchangedSize;
- }
-
- /**
- * @deprecated will be removed in Camel 2.0
- */
- public void setUnchangedSize(boolean unchangedSize) {
- this.unchangedSize = unchangedSize;
- }
-
public boolean isExclusiveReadLock() {
return exclusiveReadLock;
}
@@ -414,25 +319,4 @@
this.exclusiveReadLock = exclusiveReadLock;
}
- public boolean isAlwaysConsume() {
- return alwaysConsume;
- }
-
- /**
- * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
- */
- public void setAlwaysConsume(boolean alwaysConsume) {
- this.alwaysConsume = alwaysConsume;
- }
-
- public boolean isTimestamp() {
- return !alwaysConsume;
- }
-
- /**
- * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
- */
- public void setTimestamp(boolean timestamp) {
- this.alwaysConsume = !timestamp;
- }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java Sat Nov 29 09:29:56 2008
@@ -31,6 +31,8 @@
import org.apache.camel.Producer;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.language.simple.FileLanguage;
+import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
import org.apache.camel.util.FactoryFinder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.UuidGenerator;
@@ -49,6 +51,7 @@
private static final transient Log LOG = LogFactory.getLog(FileEndpoint.class);
private static final transient String DEFAULT_STRATEGYFACTORY_CLASS =
"org.apache.camel.component.file.strategy.FileProcessStrategyFactory";
+ private static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
private File file;
private FileProcessStrategy fileProcessStrategy;
@@ -65,6 +68,8 @@
private boolean ignoreFileNameHeader;
private Expression expression;
private String tempPrefix;
+ private boolean idempotent;
+ private MessageIdRepository idempotentRepository;
protected FileEndpoint(File file, String endpointUri, FileComponent component) {
super(endpointUri, component);
@@ -89,6 +94,13 @@
public Consumer createConsumer(Processor processor) throws Exception {
Consumer result = new FileConsumer(this, processor);
+
+ // if idempotent and no repository set then create a default one
+ if (isIdempotent() && idempotentRepository == null) {
+ LOG.info("Using default memory based idempotent repository with cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
+ idempotentRepository = MemoryMessageIdRepository.memoryMessageIdRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
+ }
+
configureConsumer(result);
return result;
}
@@ -306,6 +318,22 @@
this.tempPrefix = tempPrefix;
}
+ public boolean isIdempotent() {
+ return idempotent;
+ }
+
+ public void setIdempotent(boolean idempotent) {
+ this.idempotent = idempotent;
+ }
+
+ public MessageIdRepository getIdempotentRepository() {
+ return idempotentRepository;
+ }
+
+ public void setIdempotentRepository(MessageIdRepository idempotentRepository) {
+ this.idempotentRepository = idempotentRepository;
+ }
+
/**
* A strategy method to lazily create the file strategy
*/
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java?rev=721695&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java Sat Nov 29 09:29:56 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for consuming a batch of files (multiple files in one consume)
+ */
+public class FileConsumerBatchTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/idempotent");
+ template.sendBodyAndHeader("file://target/file-batch/", "Hello World", FileComponent.HEADER_FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file://target/file-batch/", "Bye World", FileComponent.HEADER_FILE_NAME, "bye.txt");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("file://target/file-batch?consumer.delay=1000").to("mock:result");
+ }
+ };
+ }
+
+ public void testConsumeBatch() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+ // test header keys
+ mock.message(0).header(FileComponent.HEADER_FILE_BATCH_TOTAL).isEqualTo(2);
+ mock.message(0).header(FileComponent.HEADER_FILE_BATCH_INDEX).isEqualTo(0);
+ mock.message(1).header(FileComponent.HEADER_FILE_BATCH_INDEX).isEqualTo(1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
\ No newline at end of file
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=721695&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Sat Nov 29 09:29:56 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+/**
+ * Unit test for the idempotentRepositoryRef option.
+ */
+public class FileConsumerIdempotentRefTest extends ContextTestSupport {
+
+ private static boolean invoked;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myRepo", new MyIdempotentRepository());
+ return jndi;
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/idempotent");
+ template.sendBodyAndHeader("file://target/idempotent/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("file://target/idempotent/?idempotent=true&idempotentRepositoryRef=myRepo&moveNamePrefix=done/").to("mock:result");
+ }
+ };
+ }
+
+ public void testIdempotentRef() throws Exception {
+ // consume the file the first time
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ Thread.sleep(1000);
+
+ // reset mock and set new expectations
+ mock.reset();
+ mock.expectedMessageCount(0);
+
+ // move file back
+ File file = new File("target/idempotent/done/report.txt");
+ File renamed = new File("target/idempotent/report.txt");
+ file = file.getAbsoluteFile();
+ file.renameTo(renamed.getAbsoluteFile());
+
+ // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not
+ Thread.sleep(2000);
+ assertMockEndpointsSatisfied();
+
+ assertTrue("MyIdempotentRepository should have been invoked", invoked);
+ }
+
+ public class MyIdempotentRepository implements MessageIdRepository {
+
+ public boolean contains(String messageId) {
+ // will return false 1st time, and true 2nd time
+ boolean result = invoked;
+ invoked = true;
+ assertEquals("report.txt", messageId);
+ return result;
+ }
+ }
+
+}
\ No newline at end of file
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java (from r721650, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java&r1=721650&r2=721695&rev=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java Sat Nov 29 09:29:56 2008
@@ -23,27 +23,27 @@
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test for the alwaysConsume=false option.
+ * Unit test for the idempotent=true option.
*/
-public class FileAlwaysConsumeFalseTest extends ContextTestSupport {
+public class FileConsumerIdempotentTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
super.setUp();
- deleteDirectory("target/alwaysconsume");
- template.sendBodyAndHeader("file://target/alwaysconsume/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
+ deleteDirectory("target/idempotent");
+ template.sendBodyAndHeader("file://target/idempotent/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
+ from("file://target/idempotent/?idempotent=true&moveNamePrefix=done/").to("mock:result");
}
};
}
- public void testNotAlwaysConsume() throws Exception {
+ public void testIdempotent() throws Exception {
// consume the file the first time
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");
@@ -58,12 +58,12 @@
mock.expectedMessageCount(0);
// move file back
- File file = new File("target/alwaysconsume/done/report.txt");
- File renamed = new File("target/alwaysconsume/report.txt");
+ File file = new File("target/idempotent/done/report.txt");
+ File renamed = new File("target/idempotent/report.txt");
file = file.getAbsoluteFile();
file.renameTo(renamed.getAbsoluteFile());
- // should NOT consume the file again, let 2 secs pass to let the consuemr try to consume it but it should not
+ // should NOT consume the file again, let 2 secs pass to let the consumer try to consume it but it should not
Thread.sleep(2000);
assertMockEndpointsSatisfied();
}
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileDeleteRouteTest.java Sat Nov 29 09:29:56 2008
@@ -24,6 +24,7 @@
* @version $Revision$
*/
public class FileDeleteRouteTest extends FileRouteTest {
+
@Override
protected void setUp() throws Exception {
targetdir = "target/test-delete-inbox";
@@ -33,17 +34,19 @@
@Override
public void testFileRoute() throws Exception {
+ deleteDirectory("target/test-delete-inbox");
+
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedBodiesReceived(expectedBody);
- result.setResultWaitTime(5000);
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
- Thread.sleep(4000);
-
result.assertIsSatisfied();
+ Thread.sleep(100);
+
for (String lockName : recorder.getLocks()) {
File lock = new File(lockName);
+ lock = lock.getAbsoluteFile();
assertFalse(lock.exists());
}
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryTest.java Sat Nov 29 09:29:56 2008
@@ -41,13 +41,13 @@
result.setResultWaitTime(5000);
// now lets wait a bit and move that file
- Thread.sleep(5000);
+ Thread.sleep(2000);
// lets delete the output directory
deleteDirectory(outputDirectory);
// now lets wait a bit for it to be polled
- Thread.sleep(5000);
+ Thread.sleep(2000);
File file = new File(inputDirectory + "/" + fileName);
@@ -66,7 +66,7 @@
assertFileExists(newFile);
// now lets wait for multiple polls to check we only process it once
- Thread.sleep(5000);
+ Thread.sleep(3000);
assertMockEndpointsSatisfied();
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java?rev=721695&r1=721694&r2=721695&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/MoveFilesToDirectoryWithNoopTest.java Sat Nov 29 09:29:56 2008
@@ -23,6 +23,6 @@
@Override
protected String getOutputEndpointUri() {
- return super.getOutputEndpointUri() + "?noop=true";
+ return super.getOutputEndpointUri() + "?noop=true&idempotent=true";
}
}