You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/12/04 10:32:36 UTC

svn commit: r723266 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/idempotent/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Thu Dec  4 01:32:34 2008
New Revision: 723266

URL: http://svn.apache.org/viewvc?rev=723266&view=rev
Log:
CAMEL-1099: Added FileIdempotentRepositry (work in progress)

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java   (contents, props changed)
      - copied, changed from r722965, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java   (contents, props changed)
      - copied, changed from r722965, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (from r722965, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java&r1=722965&r2=723266&rev=723266&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Thu Dec  4 01:32:34 2008
@@ -16,54 +16,72 @@
  */
 package org.apache.camel.processor.idempotent;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.Map;
+import java.util.Scanner;
 
-import org.apache.camel.Service;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
- * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 
+ * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
  * <p/>
- * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
+ * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
  * memory leak.
  *
  * @version $Revision$
  */
-public class MemoryIdempotentRepository implements IdempotentRepository<String> {
+public class FileIdempotentRepository implements IdempotentRepository<String> {
+    private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class);
+    private static final String STORE_DELIMITER = "\n";
+    private Map<String, Object> cache;
+    private File store;
+    private long maxStoreSize = 1024 * 1000L; // 1mb store file
 
-    private final Map<String, Object> cache;
-
-    public MemoryIdempotentRepository(Map<String, Object> set) {
+    public FileIdempotentRepository(final File store, final Map<String, Object> set) {
+        this.store = store;
         this.cache = set;
+        loadStore();
     }
 
     /**
-     * Creates a new memory based repository using a {@link LRUCache}
-     * with a default of 1000 entries in the cache.
+     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
+     * as 1st level cache with a default of 1000 entries in the cache.
+     *
+     * @param store  the file store
      */
-    public static IdempotentRepository memoryIdempotentRepository() {
-        return memoryIdempotentRepository(1000);
+    public static IdempotentRepository fileIdempotentRepository(File store) {
+        return fileIdempotentRepository(store, 1000);
     }
 
     /**
-     * Creates a new memory based repository using a {@link LRUCache}.
+     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
+     * as 1st level cache.
      *
+     * @param store  the file store
      * @param cacheSize  the cache size
      */
-    public static IdempotentRepository memoryIdempotentRepository(int cacheSize) {
-        return memoryIdempotentRepository(new LRUCache<String, Object>(cacheSize));
+    public static IdempotentRepository fileIdempotentRepository(File store, int cacheSize) {
+        return fileIdempotentRepository(store, new LRUCache<String, Object>(cacheSize));
     }
 
     /**
-     * Creates a new memory based repository using the given {@link Map} to
-     * use to store the processed message ids.
+     * Creates a new file based repository using the given {@link java.util.Map}
+     * as 1st level cache.
      * <p/>
-     * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
-     * memory leak. 
+     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
+     * memory leak.
+     *
+     * @param store  the file store
+     * @param cache  the cache to use as 1st level cache
      */
-    public static IdempotentRepository memoryIdempotentRepository(Map<String, Object> cache) {
-        return new MemoryIdempotentRepository(cache);
+    public static IdempotentRepository fileIdempotentRepository(File store, Map<String, Object> cache) {
+        return new FileIdempotentRepository(store, cache);
     }
 
     public boolean add(String messageId) {
@@ -72,6 +90,14 @@
                 return false;
             } else {
                 cache.put(messageId, messageId);
+                if (store.length() < maxStoreSize) {
+                    // just append to store
+                    appendToStore(messageId);
+                } else {
+                    // trunk store and flush the cache
+                    trunkStore();
+                }
+
                 return true;
             }
         }
@@ -82,4 +108,111 @@
             return cache.containsKey(key);
         }
     }
-}
+
+    public File getStore() {
+        return store;
+    }
+
+    public void setStore(File store) {
+        this.store = store;
+    }
+
+    public Map<String, Object> getCache() {
+        return cache;
+    }
+
+    public void setCache(Map<String, Object> cache) {
+        this.cache = cache;
+    }
+
+    public long getMaxStoreSize() {
+        return maxStoreSize;
+    }
+
+    /**
+     * Sets the maximum filesize for the file store in bytes.
+     * <p/>
+     * The default is 1mb.
+     */
+    public void setMaxStoreSize(long maxStoreSize) {
+        this.maxStoreSize = maxStoreSize;
+    }
+
+    /**
+     * Appends the given message id to the file store
+     *
+     * @param messageId  the message id
+     */
+    protected void appendToStore(final String messageId) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Appending " + messageId + " to idempotent filestore: " + store);
+        }
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(store, true);
+            fos.write(messageId.getBytes());
+            fos.write(STORE_DELIMITER.getBytes());
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            ObjectHelper.close(fos, "Appending to file idempotent repository", LOG);
+        }
+    }
+
+    /**
+     * Trunks the file store when the max store size is hit by rewriting the 1st level cache
+     * to the file store.
+     */
+    protected void trunkStore() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Trunking idempotent filestore: " + store);
+        }
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(store);
+            for (String key : cache.keySet()) {
+                fos.write(key.getBytes());
+                fos.write(STORE_DELIMITER.getBytes());
+            }
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            ObjectHelper.close(fos, "Trunking file idempotent repository", LOG);
+        }
+    }
+
+    /**
+     * Loads the given file store into the 1st level cache
+     */
+    protected void loadStore() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Loading to 1st level cache from idempotent filestore: " + store);
+        }
+
+        if (!store.exists()) {
+            return;
+        }
+
+        cache.clear();
+        Scanner scanner = null;
+        try {
+            scanner = new Scanner(store);
+            scanner.useDelimiter(STORE_DELIMITER);
+            while (scanner.hasNextLine()) {
+                String line = scanner.nextLine();
+                cache.put(line, line);
+            }
+        } catch (IOException e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            if (scanner != null) {
+                scanner.close();
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded " + cache.size() + " to the 1st level cache from idempotent filestore: " + store);
+        }
+    }
+
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=723266&r1=723265&r2=723266&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Thu Dec  4 01:32:34 2008
@@ -18,7 +18,6 @@
 
 import java.util.Map;
 
-import org.apache.camel.Service;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.util.LRUCache;
 
@@ -32,7 +31,7 @@
  */
 public class MemoryIdempotentRepository implements IdempotentRepository<String> {
 
-    private final Map<String, Object> cache;
+    private Map<String, Object> cache;
 
     public MemoryIdempotentRepository(Map<String, Object> set) {
         this.cache = set;
@@ -60,7 +59,9 @@
      * use to store the processed message ids.
      * <p/>
      * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
-     * memory leak. 
+     * memory leak.
+     *
+     * @param cache  the cache
      */
     public static IdempotentRepository memoryIdempotentRepository(Map<String, Object> cache) {
         return new MemoryIdempotentRepository(cache);
@@ -82,4 +83,12 @@
             return cache.containsKey(key);
         }
     }
+
+    public Map<String, Object> getCache() {
+        return cache;
+    }
+
+    public void setCache(Map<String, Object> cache) {
+        this.cache = cache;
+    }
 }

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java (from r722965, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=722965&r2=723266&rev=723266&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java Thu Dec  4 01:32:34 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.io.File;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -23,27 +25,40 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-
-import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.processor.idempotent.FileIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
 
 /**
  * @version $Revision$
  */
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class FileIdempotentConsumerTest extends ContextTestSupport {
     protected Endpoint startEndpoint;
     protected MockEndpoint resultEndpoint;
+    private File store = new File("target/idempotentfilestore.dat");
+    private IdempotentRepository repo;
 
     public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        assertFalse(repo.contains("1"));
+        assertFalse(repo.contains("2"));
+        assertFalse(repo.contains("3"));
+        assertTrue(repo.contains("4"));
+
         resultEndpoint.expectedBodiesReceived("one", "two", "three");
 
         sendMessage("1", "one");
         sendMessage("2", "two");
         sendMessage("1", "one");
         sendMessage("2", "two");
+        sendMessage("4", "four");
         sendMessage("1", "one");
         sendMessage("3", "three");
 
         resultEndpoint.assertIsSatisfied();
+
+        assertTrue(repo.contains("1"));
+        assertTrue(repo.contains("2"));
+        assertTrue(repo.contains("3"));
+        assertTrue(repo.contains("4"));
     }
 
     protected void sendMessage(final Object messageId, final Object body) {
@@ -59,8 +74,17 @@
 
     @Override
     protected void setUp() throws Exception {
-        super.setUp();
+        // delete file store before testing
+        if (store.exists()) {
+            store.delete();
+        }
+
+        repo = FileIdempotentRepository.fileIdempotentRepository(store);
 
+        // let's add 4 to start with
+        repo.add("4");
+
+        super.setUp();
         startEndpoint = resolveMandatoryEndpoint("direct:start");
         resultEndpoint = getMockEndpoint("mock:result");
     }
@@ -68,10 +92,10 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:start").idempotentConsumer(
-                        header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
-                ).to("mock:result");
+                from("direct:start")
+                    .idempotentConsumer(header("messageId"), repo)
+                    .to("mock:result");
             }
         };
     }
-}
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
------------------------------------------------------------------------------
    svn:mergeinfo =