You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/12/04 10:32:36 UTC
svn commit: r723266 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/idempotent/
test/java/org/apache/camel/processor/
Author: davsclaus
Date: Thu Dec 4 01:32:34 2008
New Revision: 723266
URL: http://svn.apache.org/viewvc?rev=723266&view=rev
Log:
CAMEL-1099: Added FileIdempotentRepositry (work in progress)
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (contents, props changed)
- copied, changed from r722965, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java (contents, props changed)
- copied, changed from r722965, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (from r722965, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java&r1=722965&r2=723266&rev=723266&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Thu Dec 4 01:32:34 2008
@@ -16,54 +16,72 @@
*/
package org.apache.camel.processor.idempotent;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.util.Map;
+import java.util.Scanner;
-import org.apache.camel.Service;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
- * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
+ * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
* <p/>
- * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
+ * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
* memory leak.
*
* @version $Revision$
*/
-public class MemoryIdempotentRepository implements IdempotentRepository<String> {
+public class FileIdempotentRepository implements IdempotentRepository<String> {
+ private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class);
+ private static final String STORE_DELIMITER = "\n";
+ private Map<String, Object> cache;
+ private File store;
+ private long maxStoreSize = 1024 * 1000L; // 1mb store file
- private final Map<String, Object> cache;
-
- public MemoryIdempotentRepository(Map<String, Object> set) {
+ public FileIdempotentRepository(final File store, final Map<String, Object> set) {
+ this.store = store;
this.cache = set;
+ loadStore();
}
/**
- * Creates a new memory based repository using a {@link LRUCache}
- * with a default of 1000 entries in the cache.
+ * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
+ * as 1st level cache with a default of 1000 entries in the cache.
+ *
+ * @param store the file store
*/
- public static IdempotentRepository memoryIdempotentRepository() {
- return memoryIdempotentRepository(1000);
+ public static IdempotentRepository fileIdempotentRepository(File store) {
+ return fileIdempotentRepository(store, 1000);
}
/**
- * Creates a new memory based repository using a {@link LRUCache}.
+ * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
+ * as 1st level cache.
*
+ * @param store the file store
* @param cacheSize the cache size
*/
- public static IdempotentRepository memoryIdempotentRepository(int cacheSize) {
- return memoryIdempotentRepository(new LRUCache<String, Object>(cacheSize));
+ public static IdempotentRepository fileIdempotentRepository(File store, int cacheSize) {
+ return fileIdempotentRepository(store, new LRUCache<String, Object>(cacheSize));
}
/**
- * Creates a new memory based repository using the given {@link Map} to
- * use to store the processed message ids.
+ * Creates a new file based repository using the given {@link java.util.Map}
+ * as 1st level cache.
* <p/>
- * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
- * memory leak.
+ * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
+ * memory leak.
+ *
+ * @param store the file store
+ * @param cache the cache to use as 1st level cache
*/
- public static IdempotentRepository memoryIdempotentRepository(Map<String, Object> cache) {
- return new MemoryIdempotentRepository(cache);
+ public static IdempotentRepository fileIdempotentRepository(File store, Map<String, Object> cache) {
+ return new FileIdempotentRepository(store, cache);
}
public boolean add(String messageId) {
@@ -72,6 +90,14 @@
return false;
} else {
cache.put(messageId, messageId);
+ if (store.length() < maxStoreSize) {
+ // just append to store
+ appendToStore(messageId);
+ } else {
+ // trunk store and flush the cache
+ trunkStore();
+ }
+
return true;
}
}
@@ -82,4 +108,111 @@
return cache.containsKey(key);
}
}
-}
+
+ public File getStore() {
+ return store;
+ }
+
+ public void setStore(File store) {
+ this.store = store;
+ }
+
+ public Map<String, Object> getCache() {
+ return cache;
+ }
+
+ public void setCache(Map<String, Object> cache) {
+ this.cache = cache;
+ }
+
+ public long getMaxStoreSize() {
+ return maxStoreSize;
+ }
+
+ /**
+ * Sets the maximum filesize for the file store in bytes.
+ * <p/>
+ * The default is 1mb.
+ */
+ public void setMaxStoreSize(long maxStoreSize) {
+ this.maxStoreSize = maxStoreSize;
+ }
+
+ /**
+ * Appends the given message id to the file store
+ *
+ * @param messageId the message id
+ */
+ protected void appendToStore(final String messageId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Appending " + messageId + " to idempotent filestore: " + store);
+ }
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(store, true);
+ fos.write(messageId.getBytes());
+ fos.write(STORE_DELIMITER.getBytes());
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ ObjectHelper.close(fos, "Appending to file idempotent repository", LOG);
+ }
+ }
+
+ /**
+ * Trunks the file store when the max store size is hit by rewriting the 1st level cache
+ * to the file store.
+ */
+ protected void trunkStore() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trunking idempotent filestore: " + store);
+ }
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(store);
+ for (String key : cache.keySet()) {
+ fos.write(key.getBytes());
+ fos.write(STORE_DELIMITER.getBytes());
+ }
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ ObjectHelper.close(fos, "Trunking file idempotent repository", LOG);
+ }
+ }
+
+ /**
+ * Loads the given file store into the 1st level cache
+ */
+ protected void loadStore() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Loading to 1st level cache from idempotent filestore: " + store);
+ }
+
+ if (!store.exists()) {
+ return;
+ }
+
+ cache.clear();
+ Scanner scanner = null;
+ try {
+ scanner = new Scanner(store);
+ scanner.useDelimiter(STORE_DELIMITER);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ cache.put(line, line);
+ }
+ } catch (IOException e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded " + cache.size() + " to the 1st level cache from idempotent filestore: " + store);
+ }
+ }
+
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=723266&r1=723265&r2=723266&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Thu Dec 4 01:32:34 2008
@@ -18,7 +18,6 @@
import java.util.Map;
-import org.apache.camel.Service;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.LRUCache;
@@ -32,7 +31,7 @@
*/
public class MemoryIdempotentRepository implements IdempotentRepository<String> {
- private final Map<String, Object> cache;
+ private Map<String, Object> cache;
public MemoryIdempotentRepository(Map<String, Object> set) {
this.cache = set;
@@ -60,7 +59,9 @@
* use to store the processed message ids.
* <p/>
* Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
- * memory leak.
+ * memory leak.
+ *
+ * @param cache the cache
*/
public static IdempotentRepository memoryIdempotentRepository(Map<String, Object> cache) {
return new MemoryIdempotentRepository(cache);
@@ -82,4 +83,12 @@
return cache.containsKey(key);
}
}
+
+ public Map<String, Object> getCache() {
+ return cache;
+ }
+
+ public void setCache(Map<String, Object> cache) {
+ this.cache = cache;
+ }
}
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java (from r722965, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=722965&r2=723266&rev=723266&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java Thu Dec 4 01:32:34 2008
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.io.File;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -23,27 +25,40 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-
-import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.processor.idempotent.FileIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
/**
* @version $Revision$
*/
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class FileIdempotentConsumerTest extends ContextTestSupport {
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;
+ private File store = new File("target/idempotentfilestore.dat");
+ private IdempotentRepository repo;
public void testDuplicateMessagesAreFilteredOut() throws Exception {
+ assertFalse(repo.contains("1"));
+ assertFalse(repo.contains("2"));
+ assertFalse(repo.contains("3"));
+ assertTrue(repo.contains("4"));
+
resultEndpoint.expectedBodiesReceived("one", "two", "three");
sendMessage("1", "one");
sendMessage("2", "two");
sendMessage("1", "one");
sendMessage("2", "two");
+ sendMessage("4", "four");
sendMessage("1", "one");
sendMessage("3", "three");
resultEndpoint.assertIsSatisfied();
+
+ assertTrue(repo.contains("1"));
+ assertTrue(repo.contains("2"));
+ assertTrue(repo.contains("3"));
+ assertTrue(repo.contains("4"));
}
protected void sendMessage(final Object messageId, final Object body) {
@@ -59,8 +74,17 @@
@Override
protected void setUp() throws Exception {
- super.setUp();
+ // delete file store before testing
+ if (store.exists()) {
+ store.delete();
+ }
+
+ repo = FileIdempotentRepository.fileIdempotentRepository(store);
+ // let's add 4 to start with
+ repo.add("4");
+
+ super.setUp();
startEndpoint = resolveMandatoryEndpoint("direct:start");
resultEndpoint = getMockEndpoint("mock:result");
}
@@ -68,10 +92,10 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:start").idempotentConsumer(
- header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).to("mock:result");
+ from("direct:start")
+ .idempotentConsumer(header("messageId"), repo)
+ .to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
------------------------------------------------------------------------------
svn:mergeinfo =