You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/09 12:12:52 UTC
svn commit: r1079749 - in /camel/trunk/components/camel-cache/src:
main/java/org/apache/camel/component/cache/CacheProducer.java
test/java/org/apache/camel/component/cache/CacheProducerTest.java
Author: davsclaus
Date: Wed Mar 9 11:12:51 2011
New Revision: 1079749
URL: http://svn.apache.org/viewvc?rev=1079749&view=rev
Log:
CAMEL-3371: camel-cache only mandates a message body for add/update operation. Thanks to Ulrich for patch.
Modified:
camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java
camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java
Modified: camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java?rev=1079749&r1=1079748&r2=1079749&view=diff
==============================================================================
--- camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java (original)
+++ camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java Wed Mar 9 11:12:51 2011
@@ -26,6 +26,7 @@ import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,24 +99,17 @@ public class CacheProducer extends Defau
private void performCacheOperation(Exchange exchange, String operation, String key) throws Exception {
Object element;
- Object body = exchange.getIn().getBody();
- if (body instanceof Serializable) {
- element = body;
- } else {
- InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, body);
- // Read InputStream into a byte[] buffer
- element = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, is);
- }
-
if (operation.equalsIgnoreCase(CacheConstants.CACHE_OPERATION_ADD)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding an element with key " + key + " into the Cache");
}
+ element = createElementFromBody(exchange, CacheConstants.CACHE_OPERATION_ADD);
cache.put(new Element(key, element), true);
} else if (operation.equalsIgnoreCase(CacheConstants.CACHE_OPERATION_UPDATE)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating an element with key " + key + " into the Cache");
}
+ element = createElementFromBody(exchange, CacheConstants.CACHE_OPERATION_UPDATE);
cache.put(new Element(key, element), true);
} else if (operation.equalsIgnoreCase(CacheConstants.CACHE_OPERATION_DELETEALL)) {
if (LOG.isDebugEnabled()) {
@@ -151,4 +145,19 @@ public class CacheProducer extends Defau
}
}
+ private Object createElementFromBody(Exchange exchange, String cacheOperation) throws NoTypeConversionAvailableException {
+ Object element;
+ Object body = exchange.getIn().getBody();
+ if (body == null) {
+ throw new CacheException("Body cannot be null for operation " + cacheOperation);
+ } else if (body instanceof Serializable) {
+ element = body;
+ } else {
+ InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, body);
+ // Read InputStream into a byte[] buffer
+ element = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, is);
+ }
+ return element;
+ }
+
}
Modified: camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java?rev=1079749&r1=1079748&r2=1079749&view=diff
==============================================================================
--- camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java (original)
+++ camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java Wed Mar 9 11:12:51 2011
@@ -45,8 +45,11 @@ public class CacheProducerTest extends C
@EndpointInject(uri = "mock:CacheProducerTest.result")
protected MockEndpoint resultEndpoint;
- @EndpointInject(uri = "mock:CacheProducerTest.exception")
- protected MockEndpoint exceptionEndpoint;
+ @EndpointInject(uri = "mock:CacheProducerTest.cacheException")
+ protected MockEndpoint cacheExceptionEndpoint;
+
+ @EndpointInject(uri = "mock:CacheProducerTest.noTypeConversionAvailableException")
+ private MockEndpoint noTypeConversionAvailableExceptionEndpoint;
@Override
public boolean isUseRouteBuilder() {
@@ -71,6 +74,14 @@ public class CacheProducerTest extends C
});
}
+ private void sendEmptyBody() {
+ template.send("direct:a", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody(null);
+ }
+ });
+ }
+
private byte[] getFileAsByteArray(String path) throws Exception {
// Read from an input stream
InputStream is = new BufferedInputStream(new FileInputStream(path));
@@ -120,6 +131,28 @@ public class CacheProducerTest extends C
}
@Test
+ public void testAddingDataToCacheDoesFailOnEmptyBody() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ onException(CacheException.class).
+ handled(true).
+ to("log:*** LOGGER").
+ to("mock:CacheProducerTest.cacheException");
+
+ from("direct:a").
+ setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)).
+ setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+ to("cache://TestCache1");
+ }
+ });
+ cacheExceptionEndpoint.expectedMessageCount(1);
+ context.start();
+ LOG.debug("------------Beginning CacheProducer Add Does Fail On Empty Body Test---------------");
+ sendEmptyBody();
+ cacheExceptionEndpoint.assertIsSatisfied();
+ }
+
+ @Test
public void testAddingSerializableDataToCache() throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() {
@@ -150,6 +183,28 @@ public class CacheProducerTest extends C
}
@Test
+ public void testUpdatingDataInCacheDoesFailOnEmptyBody() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ onException(CacheException.class).
+ handled(true).
+ to("log:*** LOGGER").
+ to("mock:CacheProducerTest.cacheException");
+
+ from("direct:a").
+ setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_UPDATE)).
+ setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+ to("cache://TestCache1");
+ }
+ });
+ cacheExceptionEndpoint.expectedMessageCount(1);
+ context.start();
+ LOG.debug("------------Beginning CacheProducer Update Does Fail On Empty Body Test---------------");
+ sendEmptyBody();
+ cacheExceptionEndpoint.assertIsSatisfied();
+ }
+
+ @Test
public void testDeletingDataFromCache() throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() {
@@ -165,6 +220,28 @@ public class CacheProducerTest extends C
}
@Test
+ public void testDeletingDataFromCacheDoesNotFailOnEmptyBody() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ onException(CacheException.class).
+ handled(true).
+ to("log:*** LOGGER").
+ to("mock:CacheProducerTest.cacheException");
+
+ from("direct:a").
+ setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETE)).
+ setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+ to("cache://TestCache1");
+ }
+ });
+ cacheExceptionEndpoint.expectedMessageCount(0);
+ context.start();
+ LOG.debug("------------Beginning CacheProducer Delete Does Not Fail On Empty Body Test---------------");
+ sendEmptyBody();
+ cacheExceptionEndpoint.assertIsSatisfied();
+ }
+
+ @Test
public void testDeletingAllDataFromCache() throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() {
@@ -179,13 +256,34 @@ public class CacheProducerTest extends C
}
@Test
+ public void testDeletingAllDataFromCacheDoesNotFailOnEmptyBody() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ onException(CacheException.class).
+ handled(true).
+ to("log:*** LOGGER").
+ to("mock:CacheProducerTest.cacheException");
+
+ from("direct:a").
+ setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETEALL)).
+ to("cache://TestCache1");
+ }
+ });
+ cacheExceptionEndpoint.expectedMessageCount(0);
+ context.start();
+ LOG.debug("------------Beginning CacheProducer Delete All Elements Does Not Fail On Empty Body Test---------------");
+ sendEmptyBody();
+ cacheExceptionEndpoint.assertIsSatisfied();
+ }
+
+ @Test
public void testUnknownOperation() throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() {
onException(CacheException.class).
handled(true).
to("log:*** LOGGER").
- to("mock:CacheProducerTest.exception");
+ to("mock:CacheProducerTest.cacheException");
from("direct:a").
setHeader(CacheConstants.CACHE_OPERATION, constant("UNKNOWN")).
@@ -195,12 +293,40 @@ public class CacheProducerTest extends C
}
});
resultEndpoint.expectedMessageCount(0);
- exceptionEndpoint.expectedMessageCount(1);
+ cacheExceptionEndpoint.expectedMessageCount(1);
context.start();
LOG.debug("------------Beginning CacheProducer Query An Elements Test---------------");
sendUpdatedFile();
resultEndpoint.assertIsSatisfied();
- exceptionEndpoint.assertIsSatisfied();
+ cacheExceptionEndpoint.assertIsSatisfied();
+ }
+
+ @Test
+ public void testUnknownOperationDoesNotFailOnEmptyBody() throws Exception {
+ final RouteBuilder builder = new RouteBuilder() {
+ public void configure() {
+ onException(CacheException.class).
+ handled(true).
+ choice().when(exceptionMessage().isEqualTo("Operation UNKNOWN is not supported.")).
+ to("log:*** LOGGER").
+ to("mock:CacheProducerTest.cacheException").end();
+
+ from("direct:a").
+ setHeader(CacheConstants.CACHE_OPERATION, constant("UNKNOWN")).
+ setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+ to("cache://TestCache1").
+ to("mock:CacheProducerTest.result");
+ }
+ };
+ context.setTracing(true);
+ context.addRoutes(builder);
+ resultEndpoint.expectedMessageCount(0);
+ cacheExceptionEndpoint.expectedMessageCount(1);
+ context.start();
+ LOG.debug("------------Beginning CacheProducer Query An Elements Does Fail On Empty Body Test---------------");
+ sendEmptyBody();
+ resultEndpoint.assertIsSatisfied();
+ cacheExceptionEndpoint.assertIsSatisfied();
}
@Test
@@ -210,7 +336,7 @@ public class CacheProducerTest extends C
onException(CacheException.class).
handled(true).
to("log:*** LOGGER").
- to("mock:CacheProducerTest.exception");
+ to("mock:CacheProducerTest.cacheException");
from("direct:a").
setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETEALL)).
@@ -223,12 +349,40 @@ public class CacheProducerTest extends C
}
});
resultEndpoint.expectedMessageCount(0);
- exceptionEndpoint.expectedMessageCount(0);
+ cacheExceptionEndpoint.expectedMessageCount(0);
context.start();
LOG.debug("------------Beginning CacheProducer Query An Elements Test---------------");
sendUpdatedFile();
resultEndpoint.assertIsSatisfied();
- exceptionEndpoint.assertIsSatisfied();
+ cacheExceptionEndpoint.assertIsSatisfied();
+ }
+
+ @Test
+ public void testQueringNonExistingDataFromCacheDoesNotFailOnEmptyBody() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ onException(CacheException.class).
+ handled(true).
+ to("log:*** LOGGER").
+ to("mock:CacheProducerTest.cacheException");
+
+ from("direct:a").
+ setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETEALL)).
+ to("cache://TestCache1").
+ setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_GET)).
+ setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+ to("cache://TestCache1").
+ choice().when(header(CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNotNull()).
+ to("mock:CacheProducerTest.result").end();
+ }
+ });
+ resultEndpoint.expectedMessageCount(0);
+ cacheExceptionEndpoint.expectedMessageCount(0);
+ context.start();
+ LOG.debug("------------Beginning CacheProducer Query An Elements Does Not Fail On Empty Body Test---------------");
+ sendEmptyBody();
+ resultEndpoint.assertIsSatisfied();
+ cacheExceptionEndpoint.assertIsSatisfied();
}
@Test
@@ -238,7 +392,7 @@ public class CacheProducerTest extends C
onException(CacheException.class).
handled(true).
to("log:*** LOGGER").
- to("mock:CacheProducerTest.exception");
+ to("mock:CacheProducerTest.cacheException");
from("direct:a").
setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)).
@@ -254,14 +408,13 @@ public class CacheProducerTest extends C
});
resultEndpoint.expectedMessageCount(1);
- exceptionEndpoint.expectedMessageCount(0);
+ cacheExceptionEndpoint.expectedMessageCount(0);
String body = new String(getFileAsByteArray(FILEPATH_UPDATEDTEST_TXT), "UTF-8");
resultEndpoint.expectedBodiesReceived(body);
context.start();
LOG.debug("------------Beginning CacheProducer Query An Elements Test---------------");
sendUpdatedFile();
resultEndpoint.assertIsSatisfied();
- exceptionEndpoint.assertIsSatisfied();
+ cacheExceptionEndpoint.assertIsSatisfied();
}
-
}