You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/13 13:33:39 UTC

svn commit: r1045095 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/idempotent/ camel-core/src/main/java/org/apache/camel/spi/ came...

Author: davsclaus
Date: Mon Dec 13 12:33:39 2010
New Revision: 1045095

URL: http://svn.apache.org/viewvc?rev=1045095&view=rev
Log:
CAMEL-3423: IdempotentRepository is now managed in JMX.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java
      - copied, changed from r1045055, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Mon Dec 13 12:33:39 2010
@@ -800,7 +800,7 @@ public class DefaultCamelContext extends
 
             for (LifecycleStrategy strategy : lifecycleStrategies) {
                 if (service instanceof Endpoint) {
-                    // use specialized endpint add
+                    // use specialized endpoint add
                     strategy.onEndpointAdd((Endpoint) service);
                 } else {
                     strategy.onServiceAdd(this, service, null);
@@ -808,7 +808,7 @@ public class DefaultCamelContext extends
             }
 
             // only add to services to close if its a singleton
-            // otherwise we could end up with a lot of endpoints (prototype scoped)
+            // otherwise we could for example end up with a lot of prototype scope endpoints
             boolean singleton = true; // assume singleton by default
             if (service instanceof IsSingleton) {
                 singleton = ((IsSingleton) service).isSingleton();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Mon Dec 13 12:33:39 2010
@@ -28,6 +28,7 @@ import org.apache.camel.builder.Expressi
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Represents an XML <idempotentConsumer/> element
@@ -65,7 +66,7 @@ public class IdempotentConsumerDefinitio
     // Fluent API
     //-------------------------------------------------------------------------
     /**
-     * Set the expression that IdempotentConsumerType will use
+     * Set the expression that the idempotent consumer will use
      * @return the builder
      */
     public ExpressionClause<IdempotentConsumerDefinition> expression() {
@@ -84,7 +85,7 @@ public class IdempotentConsumerDefinitio
     }
     
     /**
-     * Sets the the message id repository for the IdempotentConsumerType
+     * Sets the the message id repository for the idempotent consumer
      *
      * @param idempotentRepository  the repository instance of idempotent
      * @return builder
@@ -138,10 +139,15 @@ public class IdempotentConsumerDefinitio
 
         IdempotentRepository<String> idempotentRepository =
             (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
+        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
+
+        // add as service to CamelContext so we can managed it and it ensures it will be shutdown when camel shutdowns
+        routeContext.getCamelContext().addService(idempotentRepository);
 
         Expression expression = getExpression().createExpression(routeContext);
         // should be eager by default
         boolean isEager = isEager() != null ? isEager() : true;
+
         return new IdempotentConsumer(expression, idempotentRepository, isEager, childProcessor);
     }
 
@@ -152,7 +158,7 @@ public class IdempotentConsumerDefinitio
      * @return the repository
      */
     protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
-        if (idempotentRepository == null) {
+        if (idempotentRepository == null && messageIdRepositoryRef != null) {
             idempotentRepository = routeContext.lookup(messageIdRepositoryRef, IdempotentRepository.class);
         }
         return idempotentRepository;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Mon Dec 13 12:33:39 2010
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Scanner;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.LRUCache;
@@ -30,6 +31,10 @@ import org.apache.camel.util.ObjectHelpe
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
 /**
  * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
  * <p/>
@@ -38,7 +43,8 @@ import org.apache.commons.logging.LogFac
  *
  * @version $Revision$
  */
-public class FileIdempotentRepository implements IdempotentRepository<String> {
+@ManagedResource("FileIdempotentRepository")
+public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
     private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class);
     private static final String STORE_DELIMITER = "\n";
     private Map<String, Object> cache;
@@ -105,20 +111,16 @@ public class FileIdempotentRepository im
         return new FileIdempotentRepository(store, cache);
     }
 
-    public boolean add(String messageId) {
+    @ManagedOperation(description = "Adds the key to the store")
+    public boolean add(String key) {
         synchronized (cache) {
-            // init store if not loaded before
-            if (init.compareAndSet(false, true)) {
-                loadStore();
-            }
-
-            if (cache.containsKey(messageId)) {
+            if (cache.containsKey(key)) {
                 return false;
             } else {
-                cache.put(messageId, messageId);
+                cache.put(key, key);
                 if (fileStore.length() < maxFileStoreSize) {
                     // just append to store
-                    appendToStore(messageId);
+                    appendToStore(key);
                 } else {
                     // trunk store and flush the cache
                     trunkStore();
@@ -129,23 +131,17 @@ public class FileIdempotentRepository im
         }
     }
 
+    @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
         synchronized (cache) {
-            // init store if not loaded before
-            if (init.compareAndSet(false, true)) {
-                loadStore();
-            }
             return cache.containsKey(key);
         }
     }
 
+    @ManagedOperation(description = "Remove the key from the store")
     public boolean remove(String key) {
         boolean answer;
         synchronized (cache) {
-            // init store if not loaded before
-            if (init.compareAndSet(false, true)) {
-                loadStore();
-            }
             answer = cache.remove(key) != null;
             // trunk store and flush the cache on remove
             trunkStore();
@@ -166,6 +162,11 @@ public class FileIdempotentRepository im
         this.fileStore = fileStore;
     }
 
+    @ManagedAttribute(description = "The file path for the store")
+    public String getFilePath() {
+        return fileStore.getPath();
+    }
+
     public Map<String, Object> getCache() {
         return cache;
     }
@@ -174,15 +175,17 @@ public class FileIdempotentRepository im
         this.cache = cache;
     }
 
+    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
     public long getMaxFileStoreSize() {
         return maxFileStoreSize;
     }
 
     /**
-     * Sets the maximum filesize for the file store in bytes.
+     * Sets the maximum file size for the file store in bytes.
      * <p/>
      * The default is 1mb.
      */
+    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
     public void setMaxFileStoreSize(long maxFileStoreSize) {
         this.maxFileStoreSize = maxFileStoreSize;
     }
@@ -197,6 +200,27 @@ public class FileIdempotentRepository im
         cache = new LRUCache<String, Object>(size);
     }
 
+    @ManagedAttribute(description = "The current cache size")
+    public int getCacheSize() {
+        if (cache != null) {
+            return cache.size();
+        }
+        return 0;
+    }
+
+    /**
+     * Reset and clears the store to force it to reload from file
+     */
+    @ManagedOperation(description = "Reset and reloads the file store")
+    public synchronized void reset() {
+        synchronized (cache) {
+            // trunk and clear, before we reload the store
+            trunkStore();
+            cache.clear();
+            loadStore();
+        }
+    }
+
     /**
      * Appends the given message id to the file store
      *
@@ -279,4 +303,20 @@ public class FileIdempotentRepository im
         }
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        // init store if not loaded before
+        if (init.compareAndSet(false, true)) {
+            loadStore();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // reset will trunk and clear the cache
+        trunkStore();
+        cache.clear();
+        init.set(false);
+    }
+
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Mon Dec 13 12:33:39 2010
@@ -74,7 +74,7 @@ public class IdempotentConsumer extends 
             // add the key to the repository
             newKey = idempotentRepository.add(messageId);
         } else {
-            // check if we alrady have the key
+            // check if we already have the key
             newKey = !idempotentRepository.contains(messageId);
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Mon Dec 13 12:33:39 2010
@@ -18,9 +18,14 @@ package org.apache.camel.processor.idemp
 
 import java.util.Map;
 
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.util.LRUCache;
 
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
 /**
  * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 
  * <p/>
@@ -29,9 +34,9 @@ import org.apache.camel.util.LRUCache;
  *
  * @version $Revision$
  */
-public class MemoryIdempotentRepository implements IdempotentRepository<String> {
-
-    private Map<String, Object> cache;
+@ManagedResource("MemoryIdempotentRepository")
+public class MemoryIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
+    private final Map<String, Object> cache;
 
     public MemoryIdempotentRepository() {
         this.cache = new LRUCache<String, Object>(1000);
@@ -71,23 +76,26 @@ public class MemoryIdempotentRepository 
         return new MemoryIdempotentRepository(cache);
     }
 
-    public boolean add(String messageId) {
+    @ManagedOperation(description = "Adds the key to the store")
+    public boolean add(String key) {
         synchronized (cache) {
-            if (cache.containsKey(messageId)) {
+            if (cache.containsKey(key)) {
                 return false;
             } else {
-                cache.put(messageId, messageId);
+                cache.put(key, key);
                 return true;
             }
         }
     }
 
+    @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
         synchronized (cache) {
             return cache.containsKey(key);
         }
     }
 
+    @ManagedOperation(description = "Remove the key from the store")
     public boolean remove(String key) {
         synchronized (cache) {
             return cache.remove(key) != null;
@@ -103,7 +111,17 @@ public class MemoryIdempotentRepository 
         return cache;
     }
 
-    public void setCache(Map<String, Object> cache) {
-        this.cache = cache;
+    @ManagedAttribute(description = "The current cache size")
+    public int getCacheSize() {
+        return cache.size();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        cache.clear();
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Mon Dec 13 12:33:39 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Service;
+
 /**
  * Access to a repository of Message IDs to implement the
  * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
@@ -24,7 +26,7 @@ package org.apache.camel.spi;
  *
  * @version $Revision$
  */
-public interface IdempotentRepository<E> {
+public interface IdempotentRepository<E> extends Service {
 
     /**
      * Adds the key to the repository.

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Mon Dec 13 12:33:39 2010
@@ -104,6 +104,12 @@ public class FileConsumerIdempotentRefTe
         public boolean confirm(String key) {
             return true;
         }
+
+        public void start() throws Exception {
+        }
+
+        public void stop() throws Exception {
+        }
     }
     
 }
\ No newline at end of file

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java (from r1045055, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java&r1=1045055&r2=1045095&rev=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java Mon Dec 13 12:33:39 2010
@@ -14,10 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.processor;
+package org.apache.camel.management;
 
 import java.io.File;
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -25,19 +29,54 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.processor.idempotent.FileIdempotentRepository;
 import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FileUtil;
 
 /**
  * @version $Revision$
  */
-public class FileIdempotentConsumerTest extends ContextTestSupport {
+public class ManagedFileIdempotentConsumerTest extends ContextTestSupport {
     protected Endpoint startEndpoint;
     protected MockEndpoint resultEndpoint;
     private File store = new File("target/idempotentfilestore.dat");
     private IdempotentRepository<String> repo;
 
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        DefaultManagementNamingStrategy naming = (DefaultManagementNamingStrategy) context.getManagementStrategy().getManagementNamingStrategy();
+        naming.setHostName("localhost");
+        naming.setDomainName("org.apache.camel");
+        return context;
+    }
+
     public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
+
+        // services
+        Set<ObjectName> names = CastUtils.cast(mbeanServer.queryNames(new ObjectName("org.apache.camel" + ":type=services,*"), null));
+        ObjectName on = null;
+        for (ObjectName name : names) {
+            if (name.toString().contains("FileIdempotentRepository")) {
+                on = name;
+                break;
+            }
+        }
+
+        assertTrue("Should be registered", mbeanServer.isRegistered(on));
+        String path = (String) mbeanServer.getAttribute(on, "FilePath");
+        assertEquals(FileUtil.normalizePath("target/idempotentfilestore.dat"), FileUtil.normalizePath(path));
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+        assertEquals(1, size.intValue());
+
         assertFalse(repo.contains("1"));
         assertFalse(repo.contains("2"));
         assertFalse(repo.contains("3"));
@@ -59,6 +98,24 @@ public class FileIdempotentConsumerTest 
         assertTrue(repo.contains("2"));
         assertTrue(repo.contains("3"));
         assertTrue(repo.contains("4"));
+
+        size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+        assertEquals(4, size.intValue());
+
+        // remove one from repo
+        mbeanServer.invoke(on, "remove", new Object[]{"1"}, new String[]{"java.lang.String"});
+
+        // reset
+        mbeanServer.invoke(on, "reset", null, null);
+
+        // there should be 3 now
+        size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+        assertEquals(3, size.intValue());
+
+        assertFalse(repo.contains("1"));
+        assertTrue(repo.contains("2"));
+        assertTrue(repo.contains("3"));
+        assertTrue(repo.contains("4"));
     }
 
     protected void sendMessage(final Object messageId, final Object body) {

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java?rev=1045095&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java Mon Dec 13 12:33:39 2010
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FileUtil;
+
+/**
+ * @version $Revision: 937951 $
+ */
+public class ManagedMemoryIdempotentConsumerTest extends ContextTestSupport {
+    protected Endpoint startEndpoint;
+    protected MockEndpoint resultEndpoint;
+    private IdempotentRepository<String> repo;
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+        DefaultManagementNamingStrategy naming = (DefaultManagementNamingStrategy) context.getManagementStrategy().getManagementNamingStrategy();
+        naming.setHostName("localhost");
+        naming.setDomainName("org.apache.camel");
+        return context;
+    }
+
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
+
+        // services
+        Set<ObjectName> names = CastUtils.cast(mbeanServer.queryNames(new ObjectName("org.apache.camel" + ":type=services,*"), null));
+        ObjectName on = null;
+        for (ObjectName name : names) {
+            if (name.toString().contains("MemoryIdempotentRepository")) {
+                on = name;
+                break;
+            }
+        }
+        assertTrue("Should be registered", mbeanServer.isRegistered(on));
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+        assertEquals(1, size.intValue());
+
+        assertFalse(repo.contains("1"));
+        assertFalse(repo.contains("2"));
+        assertFalse(repo.contains("3"));
+        assertTrue(repo.contains("4"));
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("4", "four");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        resultEndpoint.assertIsSatisfied();
+
+        assertTrue(repo.contains("1"));
+        assertTrue(repo.contains("2"));
+        assertTrue(repo.contains("3"));
+        assertTrue(repo.contains("4"));
+
+        size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+        assertEquals(4, size.intValue());
+
+        // remove one from repo
+        mbeanServer.invoke(on, "remove", new Object[]{"1"}, new String[]{"java.lang.String"});
+
+        // there should be 3 now
+        size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+        assertEquals(3, size.intValue());
+
+        assertFalse(repo.contains("1"));
+        assertTrue(repo.contains("2"));
+        assertTrue(repo.contains("3"));
+        assertTrue(repo.contains("4"));
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send(startEndpoint, new Processor() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        repo = MemoryIdempotentRepository.memoryIdempotentRepository();
+        // lets start with 4
+        repo.add("4");
+
+        super.setUp();
+        startEndpoint = resolveMandatoryEndpoint("direct:start");
+        resultEndpoint = getMockEndpoint("mock:result");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .idempotentConsumer(header("messageId"), repo)
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java Mon Dec 13 12:33:39 2010
@@ -57,6 +57,7 @@ public class FileIdempotentTrunkStoreTes
 
         // load in new store and verify we only have the last 5 elements
         IdempotentRepository<String> repo2 = FileIdempotentRepository.fileIdempotentRepository(store);
+        repo2.start();
         assertFalse(repo2.contains("AAAAAAAAAA"));
         assertTrue(repo2.contains("BBBBBBBBBB"));
         assertTrue(repo2.contains("CCCCCCCCCC"));
@@ -92,6 +93,7 @@ public class FileIdempotentTrunkStoreTes
 
         // 5 elements in cache, and 50 bytes as max size limit for when trunking should start
         repo = FileIdempotentRepository.fileIdempotentRepository(store, 5, 50);
+        repo.start();
 
         super.setUp();
         startEndpoint = resolveMandatoryEndpoint("direct:start");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java Mon Dec 13 12:33:39 2010
@@ -89,8 +89,7 @@ public class IdempotentConsumerUsingCust
     }
 
     private final class MyRepo implements IdempotentRepository<String> {
-
-        private Map<String, String> cache = new HashMap<String, String>();
+        private final Map<String, String> cache = new HashMap<String, String>();
 
         private MyRepo() {
             // pre start with 4 already in there
@@ -118,6 +117,14 @@ public class IdempotentConsumerUsingCust
             // noop
             return true;
         }
+
+        public void start() throws Exception {
+            // noop
+        }
+
+        public void stop() throws Exception {
+            // noop
+        }
     }
 
 }

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Mon Dec 13 12:33:39 2010
@@ -20,7 +20,11 @@ import java.util.List;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
 
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.IdempotentRepository;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
 import org.springframework.orm.jpa.JpaTemplate;
 import org.springframework.orm.jpa.JpaTransactionManager;
 import org.springframework.transaction.TransactionDefinition;
@@ -31,11 +35,12 @@ import org.springframework.transaction.s
 /**
  * @version $Revision$
  */
-public class JpaMessageIdRepository implements IdempotentRepository<String> {
+@ManagedResource("JpaMessageIdRepository")
+public class JpaMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> {
     protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1 and x.messageId = ?2";
-    private JpaTemplate jpaTemplate;
-    private String processorName;
-    private TransactionTemplate transactionTemplate;
+    private final JpaTemplate jpaTemplate;
+    private final String processorName;
+    private final TransactionTemplate transactionTemplate;
 
     public JpaMessageIdRepository(JpaTemplate template, String processorName) {
         this(template, createTransactionTemplate(template), processorName);
@@ -63,6 +68,7 @@ public class JpaMessageIdRepository impl
         return transactionTemplate;
     }
 
+    @ManagedOperation(description = "Adds the key to the store")
     @SuppressWarnings("unchecked")
     public boolean add(final String messageId) {
         // Run this in single transaction.
@@ -84,6 +90,7 @@ public class JpaMessageIdRepository impl
         return rc.booleanValue();
     }
 
+    @ManagedOperation(description = "Does the store contain the given key")
     @SuppressWarnings("unchecked")
     public boolean contains(final String messageId) {
         // Run this in single transaction.
@@ -100,6 +107,7 @@ public class JpaMessageIdRepository impl
         return rc.booleanValue();
     }
 
+    @ManagedOperation(description = "Remove the key from the store")
     @SuppressWarnings("unchecked")
     public boolean remove(final String messageId) {
         Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() {
@@ -123,4 +131,16 @@ public class JpaMessageIdRepository impl
         return true;
     }
 
+    @ManagedAttribute(description = "The processor name")
+    public String getProcessorName() {
+        return processorName;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
 }