You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/13 13:33:39 UTC
svn commit: r1045095 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/idempotent/
camel-core/src/main/java/org/apache/camel/spi/ came...
Author: davsclaus
Date: Mon Dec 13 12:33:39 2010
New Revision: 1045095
URL: http://svn.apache.org/viewvc?rev=1045095&view=rev
Log:
CAMEL-3423: IdempotentRepository is now managed in JMX.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java
- copied, changed from r1045055, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Mon Dec 13 12:33:39 2010
@@ -800,7 +800,7 @@ public class DefaultCamelContext extends
for (LifecycleStrategy strategy : lifecycleStrategies) {
if (service instanceof Endpoint) {
- // use specialized endpint add
+ // use specialized endpoint add
strategy.onEndpointAdd((Endpoint) service);
} else {
strategy.onServiceAdd(this, service, null);
@@ -808,7 +808,7 @@ public class DefaultCamelContext extends
}
// only add to services to close if its a singleton
- // otherwise we could end up with a lot of endpoints (prototype scoped)
+ // otherwise we could for example end up with a lot of prototype scope endpoints
boolean singleton = true; // assume singleton by default
if (service instanceof IsSingleton) {
singleton = ((IsSingleton) service).isSingleton();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Mon Dec 13 12:33:39 2010
@@ -28,6 +28,7 @@ import org.apache.camel.builder.Expressi
import org.apache.camel.processor.idempotent.IdempotentConsumer;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
/**
* Represents an XML <idempotentConsumer/> element
@@ -65,7 +66,7 @@ public class IdempotentConsumerDefinitio
// Fluent API
//-------------------------------------------------------------------------
/**
- * Set the expression that IdempotentConsumerType will use
+ * Set the expression that the idempotent consumer will use
* @return the builder
*/
public ExpressionClause<IdempotentConsumerDefinition> expression() {
@@ -84,7 +85,7 @@ public class IdempotentConsumerDefinitio
}
/**
- * Sets the the message id repository for the IdempotentConsumerType
+ * Sets the the message id repository for the idempotent consumer
*
* @param idempotentRepository the repository instance of idempotent
* @return builder
@@ -138,10 +139,15 @@ public class IdempotentConsumerDefinitio
IdempotentRepository<String> idempotentRepository =
(IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
+ ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
+
+ // add as service to CamelContext so we can managed it and it ensures it will be shutdown when camel shutdowns
+ routeContext.getCamelContext().addService(idempotentRepository);
Expression expression = getExpression().createExpression(routeContext);
// should be eager by default
boolean isEager = isEager() != null ? isEager() : true;
+
return new IdempotentConsumer(expression, idempotentRepository, isEager, childProcessor);
}
@@ -152,7 +158,7 @@ public class IdempotentConsumerDefinitio
* @return the repository
*/
protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
- if (idempotentRepository == null) {
+ if (idempotentRepository == null && messageIdRepositoryRef != null) {
idempotentRepository = routeContext.lookup(messageIdRepositoryRef, IdempotentRepository.class);
}
return idempotentRepository;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Mon Dec 13 12:33:39 2010
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.LRUCache;
@@ -30,6 +31,10 @@ import org.apache.camel.util.ObjectHelpe
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
/**
* A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
* <p/>
@@ -38,7 +43,8 @@ import org.apache.commons.logging.LogFac
*
* @version $Revision$
*/
-public class FileIdempotentRepository implements IdempotentRepository<String> {
+@ManagedResource("FileIdempotentRepository")
+public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class);
private static final String STORE_DELIMITER = "\n";
private Map<String, Object> cache;
@@ -105,20 +111,16 @@ public class FileIdempotentRepository im
return new FileIdempotentRepository(store, cache);
}
- public boolean add(String messageId) {
+ @ManagedOperation(description = "Adds the key to the store")
+ public boolean add(String key) {
synchronized (cache) {
- // init store if not loaded before
- if (init.compareAndSet(false, true)) {
- loadStore();
- }
-
- if (cache.containsKey(messageId)) {
+ if (cache.containsKey(key)) {
return false;
} else {
- cache.put(messageId, messageId);
+ cache.put(key, key);
if (fileStore.length() < maxFileStoreSize) {
// just append to store
- appendToStore(messageId);
+ appendToStore(key);
} else {
// trunk store and flush the cache
trunkStore();
@@ -129,23 +131,17 @@ public class FileIdempotentRepository im
}
}
+ @ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
synchronized (cache) {
- // init store if not loaded before
- if (init.compareAndSet(false, true)) {
- loadStore();
- }
return cache.containsKey(key);
}
}
+ @ManagedOperation(description = "Remove the key from the store")
public boolean remove(String key) {
boolean answer;
synchronized (cache) {
- // init store if not loaded before
- if (init.compareAndSet(false, true)) {
- loadStore();
- }
answer = cache.remove(key) != null;
// trunk store and flush the cache on remove
trunkStore();
@@ -166,6 +162,11 @@ public class FileIdempotentRepository im
this.fileStore = fileStore;
}
+ @ManagedAttribute(description = "The file path for the store")
+ public String getFilePath() {
+ return fileStore.getPath();
+ }
+
public Map<String, Object> getCache() {
return cache;
}
@@ -174,15 +175,17 @@ public class FileIdempotentRepository im
this.cache = cache;
}
+ @ManagedAttribute(description = "The maximum file size for the file store in bytes")
public long getMaxFileStoreSize() {
return maxFileStoreSize;
}
/**
- * Sets the maximum filesize for the file store in bytes.
+ * Sets the maximum file size for the file store in bytes.
* <p/>
* The default is 1mb.
*/
+ @ManagedAttribute(description = "The maximum file size for the file store in bytes")
public void setMaxFileStoreSize(long maxFileStoreSize) {
this.maxFileStoreSize = maxFileStoreSize;
}
@@ -197,6 +200,27 @@ public class FileIdempotentRepository im
cache = new LRUCache<String, Object>(size);
}
+ @ManagedAttribute(description = "The current cache size")
+ public int getCacheSize() {
+ if (cache != null) {
+ return cache.size();
+ }
+ return 0;
+ }
+
+ /**
+ * Reset and clears the store to force it to reload from file
+ */
+ @ManagedOperation(description = "Reset and reloads the file store")
+ public synchronized void reset() {
+ synchronized (cache) {
+ // trunk and clear, before we reload the store
+ trunkStore();
+ cache.clear();
+ loadStore();
+ }
+ }
+
/**
* Appends the given message id to the file store
*
@@ -279,4 +303,20 @@ public class FileIdempotentRepository im
}
}
+ @Override
+ protected void doStart() throws Exception {
+ // init store if not loaded before
+ if (init.compareAndSet(false, true)) {
+ loadStore();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // reset will trunk and clear the cache
+ trunkStore();
+ cache.clear();
+ init.set(false);
+ }
+
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Mon Dec 13 12:33:39 2010
@@ -74,7 +74,7 @@ public class IdempotentConsumer extends
// add the key to the repository
newKey = idempotentRepository.add(messageId);
} else {
- // check if we alrady have the key
+ // check if we already have the key
newKey = !idempotentRepository.contains(messageId);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Mon Dec 13 12:33:39 2010
@@ -18,9 +18,14 @@ package org.apache.camel.processor.idemp
import java.util.Map;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.LRUCache;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
/**
* A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
* <p/>
@@ -29,9 +34,9 @@ import org.apache.camel.util.LRUCache;
*
* @version $Revision$
*/
-public class MemoryIdempotentRepository implements IdempotentRepository<String> {
-
- private Map<String, Object> cache;
+@ManagedResource("MemoryIdempotentRepository")
+public class MemoryIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
+ private final Map<String, Object> cache;
public MemoryIdempotentRepository() {
this.cache = new LRUCache<String, Object>(1000);
@@ -71,23 +76,26 @@ public class MemoryIdempotentRepository
return new MemoryIdempotentRepository(cache);
}
- public boolean add(String messageId) {
+ @ManagedOperation(description = "Adds the key to the store")
+ public boolean add(String key) {
synchronized (cache) {
- if (cache.containsKey(messageId)) {
+ if (cache.containsKey(key)) {
return false;
} else {
- cache.put(messageId, messageId);
+ cache.put(key, key);
return true;
}
}
}
+ @ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
synchronized (cache) {
return cache.containsKey(key);
}
}
+ @ManagedOperation(description = "Remove the key from the store")
public boolean remove(String key) {
synchronized (cache) {
return cache.remove(key) != null;
@@ -103,7 +111,17 @@ public class MemoryIdempotentRepository
return cache;
}
- public void setCache(Map<String, Object> cache) {
- this.cache = cache;
+ @ManagedAttribute(description = "The current cache size")
+ public int getCacheSize() {
+ return cache.size();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ cache.clear();
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Mon Dec 13 12:33:39 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.spi;
+import org.apache.camel.Service;
+
/**
* Access to a repository of Message IDs to implement the
* <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
@@ -24,7 +26,7 @@ package org.apache.camel.spi;
*
* @version $Revision$
*/
-public interface IdempotentRepository<E> {
+public interface IdempotentRepository<E> extends Service {
/**
* Adds the key to the repository.
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Mon Dec 13 12:33:39 2010
@@ -104,6 +104,12 @@ public class FileConsumerIdempotentRefTe
public boolean confirm(String key) {
return true;
}
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
}
}
\ No newline at end of file
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java (from r1045055, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java&r1=1045055&r2=1045095&rev=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedFileIdempotentConsumerTest.java Mon Dec 13 12:33:39 2010
@@ -14,10 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.processor;
+package org.apache.camel.management;
import java.io.File;
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -25,19 +29,54 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.idempotent.FileIdempotentRepository;
import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FileUtil;
/**
* @version $Revision$
*/
-public class FileIdempotentConsumerTest extends ContextTestSupport {
+public class ManagedFileIdempotentConsumerTest extends ContextTestSupport {
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;
private File store = new File("target/idempotentfilestore.dat");
private IdempotentRepository<String> repo;
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = new DefaultCamelContext();
+ DefaultManagementNamingStrategy naming = (DefaultManagementNamingStrategy) context.getManagementStrategy().getManagementNamingStrategy();
+ naming.setHostName("localhost");
+ naming.setDomainName("org.apache.camel");
+ return context;
+ }
+
public void testDuplicateMessagesAreFilteredOut() throws Exception {
+ MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
+
+ // services
+ Set<ObjectName> names = CastUtils.cast(mbeanServer.queryNames(new ObjectName("org.apache.camel" + ":type=services,*"), null));
+ ObjectName on = null;
+ for (ObjectName name : names) {
+ if (name.toString().contains("FileIdempotentRepository")) {
+ on = name;
+ break;
+ }
+ }
+
+ assertTrue("Should be registered", mbeanServer.isRegistered(on));
+ String path = (String) mbeanServer.getAttribute(on, "FilePath");
+ assertEquals(FileUtil.normalizePath("target/idempotentfilestore.dat"), FileUtil.normalizePath(path));
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+ assertEquals(1, size.intValue());
+
assertFalse(repo.contains("1"));
assertFalse(repo.contains("2"));
assertFalse(repo.contains("3"));
@@ -59,6 +98,24 @@ public class FileIdempotentConsumerTest
assertTrue(repo.contains("2"));
assertTrue(repo.contains("3"));
assertTrue(repo.contains("4"));
+
+ size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+ assertEquals(4, size.intValue());
+
+ // remove one from repo
+ mbeanServer.invoke(on, "remove", new Object[]{"1"}, new String[]{"java.lang.String"});
+
+ // reset
+ mbeanServer.invoke(on, "reset", null, null);
+
+ // there should be 3 now
+ size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+ assertEquals(3, size.intValue());
+
+ assertFalse(repo.contains("1"));
+ assertTrue(repo.contains("2"));
+ assertTrue(repo.contains("3"));
+ assertTrue(repo.contains("4"));
}
protected void sendMessage(final Object messageId, final Object body) {
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java?rev=1045095&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedMemoryIdempotentConsumerTest.java Mon Dec 13 12:33:39 2010
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.FileUtil;
+
+/**
+ * @version $Revision: 937951 $
+ */
+public class ManagedMemoryIdempotentConsumerTest extends ContextTestSupport {
+ protected Endpoint startEndpoint;
+ protected MockEndpoint resultEndpoint;
+ private IdempotentRepository<String> repo;
+
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = new DefaultCamelContext();
+ DefaultManagementNamingStrategy naming = (DefaultManagementNamingStrategy) context.getManagementStrategy().getManagementNamingStrategy();
+ naming.setHostName("localhost");
+ naming.setDomainName("org.apache.camel");
+ return context;
+ }
+
+ public void testDuplicateMessagesAreFilteredOut() throws Exception {
+ MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
+
+ // services
+ Set<ObjectName> names = CastUtils.cast(mbeanServer.queryNames(new ObjectName("org.apache.camel" + ":type=services,*"), null));
+ ObjectName on = null;
+ for (ObjectName name : names) {
+ if (name.toString().contains("MemoryIdempotentRepository")) {
+ on = name;
+ break;
+ }
+ }
+ assertTrue("Should be registered", mbeanServer.isRegistered(on));
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+ assertEquals(1, size.intValue());
+
+ assertFalse(repo.contains("1"));
+ assertFalse(repo.contains("2"));
+ assertFalse(repo.contains("3"));
+ assertTrue(repo.contains("4"));
+
+ resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("1", "one");
+ sendMessage("2", "two");
+ sendMessage("4", "four");
+ sendMessage("1", "one");
+ sendMessage("3", "three");
+
+ resultEndpoint.assertIsSatisfied();
+
+ assertTrue(repo.contains("1"));
+ assertTrue(repo.contains("2"));
+ assertTrue(repo.contains("3"));
+ assertTrue(repo.contains("4"));
+
+ size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+ assertEquals(4, size.intValue());
+
+ // remove one from repo
+ mbeanServer.invoke(on, "remove", new Object[]{"1"}, new String[]{"java.lang.String"});
+
+ // there should be 3 now
+ size = (Integer) mbeanServer.getAttribute(on, "CacheSize");
+ assertEquals(3, size.intValue());
+
+ assertFalse(repo.contains("1"));
+ assertTrue(repo.contains("2"));
+ assertTrue(repo.contains("3"));
+ assertTrue(repo.contains("4"));
+ }
+
+ protected void sendMessage(final Object messageId, final Object body) {
+ template.send(startEndpoint, new Processor() {
+ public void process(Exchange exchange) {
+ // now lets fire in a message
+ Message in = exchange.getIn();
+ in.setBody(body);
+ in.setHeader("messageId", messageId);
+ }
+ });
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ repo = MemoryIdempotentRepository.memoryIdempotentRepository();
+ // lets start with 4
+ repo.add("4");
+
+ super.setUp();
+ startEndpoint = resolveMandatoryEndpoint("direct:start");
+ resultEndpoint = getMockEndpoint("mock:result");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .idempotentConsumer(header("messageId"), repo)
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java Mon Dec 13 12:33:39 2010
@@ -57,6 +57,7 @@ public class FileIdempotentTrunkStoreTes
// load in new store and verify we only have the last 5 elements
IdempotentRepository<String> repo2 = FileIdempotentRepository.fileIdempotentRepository(store);
+ repo2.start();
assertFalse(repo2.contains("AAAAAAAAAA"));
assertTrue(repo2.contains("BBBBBBBBBB"));
assertTrue(repo2.contains("CCCCCCCCCC"));
@@ -92,6 +93,7 @@ public class FileIdempotentTrunkStoreTes
// 5 elements in cache, and 50 bytes as max size limit for when trunking should start
repo = FileIdempotentRepository.fileIdempotentRepository(store, 5, 50);
+ repo.start();
super.setUp();
startEndpoint = resolveMandatoryEndpoint("direct:start");
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerUsingCustomRepositoryTest.java Mon Dec 13 12:33:39 2010
@@ -89,8 +89,7 @@ public class IdempotentConsumerUsingCust
}
private final class MyRepo implements IdempotentRepository<String> {
-
- private Map<String, String> cache = new HashMap<String, String>();
+ private final Map<String, String> cache = new HashMap<String, String>();
private MyRepo() {
// pre start with 4 already in there
@@ -118,6 +117,14 @@ public class IdempotentConsumerUsingCust
// noop
return true;
}
+
+ public void start() throws Exception {
+ // noop
+ }
+
+ public void stop() throws Exception {
+ // noop
+ }
}
}
Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=1045095&r1=1045094&r2=1045095&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Mon Dec 13 12:33:39 2010
@@ -20,7 +20,11 @@ import java.util.List;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.IdempotentRepository;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.orm.jpa.JpaTemplate;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@@ -31,11 +35,12 @@ import org.springframework.transaction.s
/**
* @version $Revision$
*/
-public class JpaMessageIdRepository implements IdempotentRepository<String> {
+@ManagedResource("JpaMessageIdRepository")
+public class JpaMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> {
protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1 and x.messageId = ?2";
- private JpaTemplate jpaTemplate;
- private String processorName;
- private TransactionTemplate transactionTemplate;
+ private final JpaTemplate jpaTemplate;
+ private final String processorName;
+ private final TransactionTemplate transactionTemplate;
public JpaMessageIdRepository(JpaTemplate template, String processorName) {
this(template, createTransactionTemplate(template), processorName);
@@ -63,6 +68,7 @@ public class JpaMessageIdRepository impl
return transactionTemplate;
}
+ @ManagedOperation(description = "Adds the key to the store")
@SuppressWarnings("unchecked")
public boolean add(final String messageId) {
// Run this in single transaction.
@@ -84,6 +90,7 @@ public class JpaMessageIdRepository impl
return rc.booleanValue();
}
+ @ManagedOperation(description = "Does the store contain the given key")
@SuppressWarnings("unchecked")
public boolean contains(final String messageId) {
// Run this in single transaction.
@@ -100,6 +107,7 @@ public class JpaMessageIdRepository impl
return rc.booleanValue();
}
+ @ManagedOperation(description = "Remove the key from the store")
@SuppressWarnings("unchecked")
public boolean remove(final String messageId) {
Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() {
@@ -123,4 +131,16 @@ public class JpaMessageIdRepository impl
return true;
}
+ @ManagedAttribute(description = "The processor name")
+ public String getProcessorName() {
+ return processorName;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ }
}