You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/09 12:12:52 UTC

svn commit: r1079749 - in /camel/trunk/components/camel-cache/src: main/java/org/apache/camel/component/cache/CacheProducer.java test/java/org/apache/camel/component/cache/CacheProducerTest.java

Author: davsclaus
Date: Wed Mar  9 11:12:51 2011
New Revision: 1079749

URL: http://svn.apache.org/viewvc?rev=1079749&view=rev
Log:
CAMEL-3371: camel-cache only mandates a message body for add/update operation. Thanks to Ulrich for patch.

Modified:
    camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java
    camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java

Modified: camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java?rev=1079749&r1=1079748&r2=1079749&view=diff
==============================================================================
--- camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java (original)
+++ camel/trunk/components/camel-cache/src/main/java/org/apache/camel/component/cache/CacheProducer.java Wed Mar  9 11:12:51 2011
@@ -26,6 +26,7 @@ import net.sf.ehcache.Ehcache;
 import net.sf.ehcache.Element;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.impl.DefaultProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,24 +99,17 @@ public class CacheProducer extends Defau
     private void performCacheOperation(Exchange exchange, String operation, String key) throws Exception {
         Object element;
 
-        Object body = exchange.getIn().getBody();
-        if (body instanceof Serializable) {
-            element = body;
-        } else {
-            InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, body);
-            // Read InputStream into a byte[] buffer
-            element = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, is);
-        }
-
         if (operation.equalsIgnoreCase(CacheConstants.CACHE_OPERATION_ADD)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding an element with key " + key + " into the Cache");
             }
+            element = createElementFromBody(exchange, CacheConstants.CACHE_OPERATION_ADD);
             cache.put(new Element(key, element), true);
         } else if (operation.equalsIgnoreCase(CacheConstants.CACHE_OPERATION_UPDATE)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Updating an element with key " + key + " into the Cache");
             }
+            element = createElementFromBody(exchange, CacheConstants.CACHE_OPERATION_UPDATE);
             cache.put(new Element(key, element), true);
         } else if (operation.equalsIgnoreCase(CacheConstants.CACHE_OPERATION_DELETEALL)) {
             if (LOG.isDebugEnabled()) {
@@ -151,4 +145,19 @@ public class CacheProducer extends Defau
         }
     }
 
+    private Object createElementFromBody(Exchange exchange, String cacheOperation) throws NoTypeConversionAvailableException {
+        Object element;
+        Object body = exchange.getIn().getBody();
+        if (body == null) {
+            throw new CacheException("Body cannot be null for operation " + cacheOperation);
+        } else if (body instanceof Serializable) {
+            element = body;
+        } else {
+            InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, body);
+            // Read InputStream into a byte[] buffer
+            element = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, is);
+        }
+        return element;
+    }
+
 }

Modified: camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java?rev=1079749&r1=1079748&r2=1079749&view=diff
==============================================================================
--- camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java (original)
+++ camel/trunk/components/camel-cache/src/test/java/org/apache/camel/component/cache/CacheProducerTest.java Wed Mar  9 11:12:51 2011
@@ -45,8 +45,11 @@ public class CacheProducerTest extends C
     @EndpointInject(uri = "mock:CacheProducerTest.result")
     protected MockEndpoint resultEndpoint;
 
-    @EndpointInject(uri = "mock:CacheProducerTest.exception")
-    protected MockEndpoint exceptionEndpoint;
+    @EndpointInject(uri = "mock:CacheProducerTest.cacheException")
+    protected MockEndpoint cacheExceptionEndpoint;
+
+    @EndpointInject(uri = "mock:CacheProducerTest.noTypeConversionAvailableException")
+    private MockEndpoint noTypeConversionAvailableExceptionEndpoint;
 
     @Override
     public boolean isUseRouteBuilder() {
@@ -71,6 +74,14 @@ public class CacheProducerTest extends C
         });
     }
 
+    private void sendEmptyBody() {
+        template.send("direct:a", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody(null);
+            }
+        });
+    }
+
     private byte[] getFileAsByteArray(String path) throws Exception {
         // Read from an input stream
         InputStream is = new BufferedInputStream(new FileInputStream(path));
@@ -120,6 +131,28 @@ public class CacheProducerTest extends C
     }
 
     @Test
+    public void testAddingDataToCacheDoesFailOnEmptyBody() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                onException(CacheException.class).
+                        handled(true).
+                        to("log:*** LOGGER").
+                        to("mock:CacheProducerTest.cacheException");
+
+                from("direct:a").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)).
+                        setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+                        to("cache://TestCache1");
+            }
+        });
+        cacheExceptionEndpoint.expectedMessageCount(1);
+        context.start();
+        LOG.debug("------------Beginning CacheProducer Add Does Fail On Empty Body Test---------------");
+        sendEmptyBody();
+        cacheExceptionEndpoint.assertIsSatisfied();
+    }
+
+    @Test
     public void testAddingSerializableDataToCache() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() {
@@ -150,6 +183,28 @@ public class CacheProducerTest extends C
     }
 
     @Test
+    public void testUpdatingDataInCacheDoesFailOnEmptyBody() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                onException(CacheException.class).
+                        handled(true).
+                        to("log:*** LOGGER").
+                        to("mock:CacheProducerTest.cacheException");
+
+                from("direct:a").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_UPDATE)).
+                        setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+                        to("cache://TestCache1");
+            }
+        });
+        cacheExceptionEndpoint.expectedMessageCount(1);
+        context.start();
+        LOG.debug("------------Beginning CacheProducer Update Does Fail On Empty Body Test---------------");
+        sendEmptyBody();
+        cacheExceptionEndpoint.assertIsSatisfied();
+    }
+
+    @Test
     public void testDeletingDataFromCache() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() {
@@ -165,6 +220,28 @@ public class CacheProducerTest extends C
     }
 
     @Test
+    public void testDeletingDataFromCacheDoesNotFailOnEmptyBody() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                onException(CacheException.class).
+                        handled(true).
+                        to("log:*** LOGGER").
+                        to("mock:CacheProducerTest.cacheException");
+
+                from("direct:a").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETE)).
+                        setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+                        to("cache://TestCache1");
+            }
+        });
+        cacheExceptionEndpoint.expectedMessageCount(0);
+        context.start();
+        LOG.debug("------------Beginning CacheProducer Delete Does Not Fail On Empty Body Test---------------");
+        sendEmptyBody();
+        cacheExceptionEndpoint.assertIsSatisfied();
+    }
+
+    @Test
     public void testDeletingAllDataFromCache() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() {
@@ -179,13 +256,34 @@ public class CacheProducerTest extends C
     }
 
     @Test
+    public void testDeletingAllDataFromCacheDoesNotFailOnEmptyBody() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                onException(CacheException.class).
+                        handled(true).
+                        to("log:*** LOGGER").
+                        to("mock:CacheProducerTest.cacheException");
+
+                from("direct:a").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETEALL)).
+                        to("cache://TestCache1");
+            }
+        });
+        cacheExceptionEndpoint.expectedMessageCount(0);
+        context.start();
+        LOG.debug("------------Beginning CacheProducer Delete All Elements Does Not Fail On Empty Body Test---------------");
+        sendEmptyBody();
+        cacheExceptionEndpoint.assertIsSatisfied();
+    }
+
+    @Test
     public void testUnknownOperation() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() {
                 onException(CacheException.class).
                         handled(true).
                         to("log:*** LOGGER").
-                        to("mock:CacheProducerTest.exception");
+                        to("mock:CacheProducerTest.cacheException");
 
                 from("direct:a").
                         setHeader(CacheConstants.CACHE_OPERATION, constant("UNKNOWN")).
@@ -195,12 +293,40 @@ public class CacheProducerTest extends C
             }
         });
         resultEndpoint.expectedMessageCount(0);
-        exceptionEndpoint.expectedMessageCount(1);
+        cacheExceptionEndpoint.expectedMessageCount(1);
         context.start();
         LOG.debug("------------Beginning CacheProducer Query An Elements Test---------------");
         sendUpdatedFile();
         resultEndpoint.assertIsSatisfied();
-        exceptionEndpoint.assertIsSatisfied();
+        cacheExceptionEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testUnknownOperationDoesNotFailOnEmptyBody() throws Exception {
+        final RouteBuilder builder = new RouteBuilder() {
+            public void configure() {
+                onException(CacheException.class).
+                        handled(true).
+                        choice().when(exceptionMessage().isEqualTo("Operation UNKNOWN is not supported.")).
+                        to("log:*** LOGGER").
+                        to("mock:CacheProducerTest.cacheException").end();
+
+                from("direct:a").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant("UNKNOWN")).
+                        setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+                        to("cache://TestCache1").
+                        to("mock:CacheProducerTest.result");
+            }
+        };
+        context.setTracing(true);
+        context.addRoutes(builder);
+        resultEndpoint.expectedMessageCount(0);
+        cacheExceptionEndpoint.expectedMessageCount(1);
+        context.start();
+        LOG.debug("------------Beginning CacheProducer Query An Elements Does Fail On Empty Body Test---------------");
+        sendEmptyBody();
+        resultEndpoint.assertIsSatisfied();
+        cacheExceptionEndpoint.assertIsSatisfied();
     }
 
     @Test
@@ -210,7 +336,7 @@ public class CacheProducerTest extends C
                 onException(CacheException.class).
                         handled(true).
                         to("log:*** LOGGER").
-                        to("mock:CacheProducerTest.exception");
+                        to("mock:CacheProducerTest.cacheException");
 
                 from("direct:a").
                         setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETEALL)).
@@ -223,12 +349,40 @@ public class CacheProducerTest extends C
             }
         });
         resultEndpoint.expectedMessageCount(0);
-        exceptionEndpoint.expectedMessageCount(0);
+        cacheExceptionEndpoint.expectedMessageCount(0);
         context.start();
         LOG.debug("------------Beginning CacheProducer Query An Elements Test---------------");
         sendUpdatedFile();
         resultEndpoint.assertIsSatisfied();
-        exceptionEndpoint.assertIsSatisfied();
+        cacheExceptionEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testQueringNonExistingDataFromCacheDoesNotFailOnEmptyBody() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                onException(CacheException.class).
+                        handled(true).
+                        to("log:*** LOGGER").
+                        to("mock:CacheProducerTest.cacheException");
+
+                from("direct:a").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_DELETEALL)).
+                        to("cache://TestCache1").
+                        setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_GET)).
+                        setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson")).
+                        to("cache://TestCache1").
+                        choice().when(header(CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNotNull()).
+                        to("mock:CacheProducerTest.result").end();
+            }
+        });
+        resultEndpoint.expectedMessageCount(0);
+        cacheExceptionEndpoint.expectedMessageCount(0);
+        context.start();
+        LOG.debug("------------Beginning CacheProducer Query An Elements Does Not Fail On Empty Body Test---------------");
+        sendEmptyBody();
+        resultEndpoint.assertIsSatisfied();
+        cacheExceptionEndpoint.assertIsSatisfied();
     }
 
     @Test
@@ -238,7 +392,7 @@ public class CacheProducerTest extends C
                 onException(CacheException.class).
                         handled(true).
                         to("log:*** LOGGER").
-                        to("mock:CacheProducerTest.exception");
+                        to("mock:CacheProducerTest.cacheException");
 
                 from("direct:a").
                         setHeader(CacheConstants.CACHE_OPERATION, constant(CacheConstants.CACHE_OPERATION_ADD)).
@@ -254,14 +408,13 @@ public class CacheProducerTest extends C
         });
 
         resultEndpoint.expectedMessageCount(1);
-        exceptionEndpoint.expectedMessageCount(0);
+        cacheExceptionEndpoint.expectedMessageCount(0);
         String body = new String(getFileAsByteArray(FILEPATH_UPDATEDTEST_TXT), "UTF-8");
         resultEndpoint.expectedBodiesReceived(body);
         context.start();
         LOG.debug("------------Beginning CacheProducer Query An Elements Test---------------");
         sendUpdatedFile();
         resultEndpoint.assertIsSatisfied();
-        exceptionEndpoint.assertIsSatisfied();
+        cacheExceptionEndpoint.assertIsSatisfied();
     }
-
 }