You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by io...@apache.org on 2012/06/03 11:15:01 UTC

svn commit: r1345633 - in /camel/trunk: components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/ components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/ components/camel-jclouds/src/test/java/org/apache/camel/c...

Author: iocanel
Date: Sun Jun  3 09:15:00 2012
New Revision: 1345633

URL: http://svn.apache.org/viewvc?rev=1345633&view=rev
Log:
[CAMEL-5314] Added converter that convert files, streams etc to payload for blobstores.

Added:
    camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java
    camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
Removed:
    camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/SimpleObject.java
Modified:
    camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
    camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java
    camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java
    camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java
    camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java
    camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java
    camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java
    camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java

Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Sun Jun  3 09:15:00 2012
@@ -16,17 +16,22 @@
  */
 package org.apache.camel.component.jclouds;
 
+import java.io.InputStream;
 import java.util.LinkedList;
 import java.util.Queue;
 
+import com.google.common.base.Strings;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.converter.stream.CachedOutputStream;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.blobstore.domain.StorageType;
 import org.jclouds.blobstore.options.ListContainerOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,21 +52,42 @@ public class JcloudsBlobStoreConsumer ex
     }
 
     @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        String container = endpoint.getContainer();
+        String locationId = endpoint.getLocationId();
+        JcloudsBlobStoreHelper.ensureContainerExists(blobStore, container, locationId);
+    }
+
+    @Override
     protected int poll() throws Exception {
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
         Queue<Exchange> queue = new LinkedList<Exchange>();
+        String directory = endpoint.getDirectory();
 
         ListContainerOptions opt = new ListContainerOptions();
 
-        for (StorageMetadata md : blobStore.list(container, opt.maxResults(maxMessagesPerPoll))) {
+        if (!Strings.isNullOrEmpty(directory)) {
+            opt = opt.inDirectory(directory);
+        }
+
+        for (StorageMetadata md : blobStore.list(container, opt.maxResults(maxMessagesPerPoll).recursive())) {
             String blobName = md.getName();
-            Object body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName, Thread.currentThread().getContextClassLoader());
-            Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setBody(body);
-            exchange.setProperty(JcloudsConstants.BLOB_NAME, blobName);
-            queue.add(exchange);
+            if (md.getType().equals(StorageType.BLOB)) {
+                if (!Strings.isNullOrEmpty(blobName)) {
+                    InputStream body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName);
+                    if (body != null) {
+                        Exchange exchange = endpoint.createExchange();
+                        CachedOutputStream cos = new CachedOutputStream(exchange);
+                        IOHelper.copy(body, cos);
+                        exchange.getIn().setBody(cos.getStreamCache());
+                        exchange.setProperty(JcloudsConstants.BLOB_NAME, blobName);
+                        queue.add(exchange);
+                    }
+                }
+            }
         }
         return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
     }
@@ -80,20 +106,14 @@ public class JcloudsBlobStoreConsumer ex
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
-            // add on completion to handle after work when the exchange is done
-            exchange.addOnCompletion(new Synchronization() {
-                public void onComplete(Exchange exchange) {
-                    String blobName = (String) exchange.getProperty(JcloudsConstants.BLOB_NAME);
-                    blobStore.removeBlob(container, blobName);
-                }
-
-                public void onFailure(Exchange exchange) {
-                 //empty method
-                }
-            });
-
             LOG.trace("Processing exchange [{}]...", exchange);
             getProcessor().process(exchange);
+            if (exchange.getException() != null) {
+                // if we failed then throw exception
+                throw exchange.getException();
+            }
+
+            blobStore.removeBlob(container, exchange.getProperty(JcloudsConstants.BLOB_NAME, String.class));
         }
 
         return total;

Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java (original)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java Sun Jun  3 09:15:00 2012
@@ -24,8 +24,10 @@ import org.jclouds.blobstore.BlobStore;
 
 public class JcloudsBlobStoreEndpoint extends JcloudsEndpoint {
 
-    private String blobName;
+    private String locationId;
     private String container;
+    private String directory;
+    private String blobName;
     private String operation;
 
     private BlobStore blobStore;
@@ -52,6 +54,14 @@ public class JcloudsBlobStoreEndpoint ex
         return new JcloudsBlobStoreConsumer(this, processor, blobStore);
     }
 
+    public String getLocationId() {
+        return locationId;
+    }
+
+    public void setLocationId(String locationId) {
+        this.locationId = locationId;
+    }
+
     public String getContainer() {
         return container;
     }
@@ -60,12 +70,12 @@ public class JcloudsBlobStoreEndpoint ex
         this.container = container;
     }
 
-    public String getOperation() {
-        return operation;
+    public String getDirectory() {
+        return directory;
     }
 
-    public void setOperation(String operation) {
-        this.operation = operation;
+    public void setDirectory(String directory) {
+        this.directory = directory;
     }
 
     public String getBlobName() {
@@ -75,4 +85,12 @@ public class JcloudsBlobStoreEndpoint ex
     public void setBlobName(String blobName) {
         this.blobName = blobName;
     }
+
+    public String getOperation() {
+        return operation;
+    }
+
+    public void setOperation(String operation) {
+        this.operation = operation;
+    }
 }

Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java (original)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java Sun Jun  3 09:15:00 2012
@@ -17,17 +17,17 @@
 
 package org.apache.camel.component.jclouds;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import org.apache.camel.util.IOHelper;
+import javax.ws.rs.core.MediaType;
+import com.google.common.base.Strings;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.util.BlobStoreUtils;
+import org.jclouds.domain.Location;
+import org.jclouds.io.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;
 
 public final class JcloudsBlobStoreHelper {
 
@@ -38,30 +38,64 @@ public final class JcloudsBlobStoreHelpe
     }
 
     /**
-     * Writes payload to the the blobstore.
+     * Creates all directories that are part of the blobName.
      *
      * @param blobStore
      * @param container
      * @param blobName
-     * @param payload
      */
-    public static void writeBlob(BlobStore blobStore, String container, String blobName, Object payload) {
-        if (blobName != null && payload != null) {
-            Blob blob = blobStore.blobBuilder(blobName).build();
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = null;
-            try {
-                oos = new ObjectOutputStream(baos);
-                oos.writeObject(payload);
-                blob.setPayload(baos.toByteArray());
-                blobStore.putBlob(container, blob);
-            } catch (IOException e) {
-                LOG.error("Error while writing blob", e);
-            } finally {
-                IOHelper.close(oos, baos);
+    public static void mkDirs(BlobStore blobStore, String container, String blobName) {
+        if (blobStore != null && !Strings.isNullOrEmpty(blobName) && blobName.contains("/")) {
+            String directory = BlobStoreUtils.parseDirectoryFromPath(blobName);
+            blobStore.createDirectory(container, directory);
+        }
+    }
+
+    /**
+     * Checks if container exists and creates one if not.
+     *
+     * @param blobStore  The {@link BlobStore} to use.
+     * @param container  The container name to check against.
+     * @param locationId The locationId to create the container if not found.
+     */
+    public static void ensureContainerExists(BlobStore blobStore, String container, String locationId) {
+        if (blobStore != null && !Strings.isNullOrEmpty(container) && !blobStore.containerExists(container)) {
+            blobStore.createContainerInLocation(getLocationById(blobStore, locationId), container);
+        }
+    }
+
+    /**
+     * Returns the {@link Location} that matches the locationId.
+     *
+     * @param blobStore
+     * @param locationId
+     * @return
+     */
+    public static Location getLocationById(BlobStore blobStore, String locationId) {
+        if (blobStore != null && !Strings.isNullOrEmpty(locationId)) {
+            for (Location location : blobStore.listAssignableLocations()) {
+                if (locationId.equals(location.getId())) {
+                    return location;
+                }
             }
         }
+        return null;
+    }
 
+    /**
+     * Writes {@link Payload} to the the {@link BlobStore}.
+     *
+     * @param blobStore
+     * @param container
+     * @param blobName
+     * @param payload
+     */
+    public static void writeBlob(BlobStore blobStore, String container, String blobName, Payload payload) {
+        if (blobName != null && payload != null) {
+            mkDirs(blobStore, container, blobName);
+            Blob blob = blobStore.blobBuilder(blobName).payload(payload).contentType(MediaType.APPLICATION_OCTET_STREAM).contentDisposition(blobName).build();
+            blobStore.putBlob(container, blob, multipart());
+        }
     }
 
     /**
@@ -71,34 +105,14 @@ public final class JcloudsBlobStoreHelpe
      * @param blobName
      * @return
      */
-    public static Object readBlob(BlobStore blobStore, String container, String blobName, final ClassLoader classLoader) {
-        Object result = null;
-        ObjectInputStream ois = null;
-        blobStore.createContainerInLocation(null, container);
-
-        InputStream is = blobStore.getBlob(container, blobName).getPayload().getInput();
-
-        try {
-            ois = new ObjectInputStream(is) {
-                @Override
-                public Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-                    try {
-                        return classLoader.loadClass(desc.getName());
-                    } catch (Exception e) {
-                    }
-                    return super.resolveClass(desc);
-                }
-            };
-            result = ois.readObject();
-        } catch (IOException
-                e) {
-            e.printStackTrace();
-        } catch (ClassNotFoundException
-                e) {
-            e.printStackTrace();
-        } finally {
-            IOHelper.close(ois, is);
+    public static InputStream readBlob(BlobStore blobStore, String container, String blobName) {
+        InputStream is = null;
+        if (!Strings.isNullOrEmpty(blobName)) {
+            Blob blob = blobStore.getBlob(container, blobName);
+            if (blob != null && blob.getPayload() != null) {
+                is = blobStore.getBlob(container, blobName).getPayload().getInput();
+            }
         }
-        return result;
+        return is;
     }
 }

Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java (original)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java Sun Jun  3 09:15:00 2012
@@ -19,6 +19,7 @@ package org.apache.camel.component.jclou
 import org.apache.camel.Exchange;
 
 import org.jclouds.blobstore.BlobStore;
+import org.jclouds.io.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,11 +27,21 @@ public class JcloudsBlobStoreProducer ex
 
     private static final Logger LOG = LoggerFactory.getLogger(JcloudsBlobStoreProducer.class);
 
+    private final JcloudsBlobStoreEndpoint endpoint;
     private BlobStore blobStore;
 
-    public JcloudsBlobStoreProducer(JcloudsEndpoint endpoint, BlobStore blobStore) {
+    public JcloudsBlobStoreProducer(JcloudsBlobStoreEndpoint endpoint, BlobStore blobStore) {
         super(endpoint);
         this.blobStore = blobStore;
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        String container = endpoint.getContainer();
+        String locationId = endpoint.getLocationId();
+        JcloudsBlobStoreHelper.ensureContainerExists(blobStore, container, locationId);
     }
 
     @Override
@@ -40,10 +51,10 @@ public class JcloudsBlobStoreProducer ex
         String operation = getOperation(exchange);
 
         LOG.trace("Processing {} operation on '{}'", operation, container + "/" + blobName);
-        Object body = exchange.getIn().getBody();
         if (JcloudsConstants.GET.equals(operation)) {
-            exchange.getOut().setBody(JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName, Thread.currentThread().getContextClassLoader()));
+            exchange.getOut().setBody(JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName));
         } else {
+            Payload body = exchange.getIn().getBody(Payload.class);
             JcloudsBlobStoreHelper.writeBlob(blobStore, container, blobName, body);
         }
     }
@@ -90,4 +101,19 @@ public class JcloudsBlobStoreProducer ex
         }
         return operation;
     }
+
+    /**
+     * Retrieves the locationId from the URI or from the exchange headers. The header will take precedence over the URI.
+     *
+     * @param exchange
+     * @return
+     */
+    public String getLocationId(Exchange exchange) {
+        String operation = ((JcloudsBlobStoreEndpoint) getEndpoint()).getLocationId();
+
+        if (exchange.getIn().getHeader(JcloudsConstants.LOCATION_ID) != null) {
+            operation = (String) exchange.getIn().getHeader(JcloudsConstants.LOCATION_ID);
+        }
+        return operation;
+    }
 }

Added: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java?rev=1345633&view=auto
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java (added)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java Sun Jun  3 09:15:00 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jclouds;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.InputSupplier;
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
+import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.converter.stream.StreamSourceCache;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.util.IOHelper;
+import org.jclouds.io.Payload;
+import org.jclouds.io.payloads.ByteArrayPayload;
+import org.jclouds.io.payloads.FilePayload;
+import org.jclouds.io.payloads.InputStreamPayload;
+import org.jclouds.io.payloads.StringPayload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Converter
+public final class JcloudsPayloadConverter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JcloudsPayloadConverter.class);
+
+    private JcloudsPayloadConverter() {
+        //Utility Class
+    }
+
+    @Converter
+    public static Payload toPayload(byte[] bytes) {
+        return new ByteArrayPayload(bytes);
+    }
+
+    @Converter
+    public static Payload toPayload(String str) {
+        return new StringPayload(str);
+    }
+
+    @Converter
+    public static Payload toPayload(File file) {
+        return new FilePayload(file);
+    }
+
+    @Converter
+    public static Payload toPayload(InputStream is, Exchange exchange) throws IOException {
+        if (is.markSupported()) {
+            InputStreamPayload payload = new InputStreamPayload(is);
+            long contentLength = ByteStreams.length(payload);
+            is.reset();
+            payload.getContentMetadata().setContentLength(contentLength);
+            return payload;
+        } else {
+            CachedOutputStream cos = new CachedOutputStream(exchange);
+            return toPayload(cos.getWrappedInputStream(), exchange);
+        }
+    }
+
+    @Converter
+    public static Payload toPayload(StreamSource source, Exchange exchange) throws IOException {
+        return toPayload(new StreamSourceCache(source, exchange));
+    }
+
+    @Converter
+    public static Payload toPayload(final StreamSourceCache cache) throws IOException {
+        long contentLength = ByteStreams.length(new InputSupplier<InputStream>() {
+            @Override
+            public InputStream getInput() throws IOException {
+                return cache.getInputStream();
+            }
+        });
+        cache.reset();
+        InputStreamPayload payload = new InputStreamPayload(cache.getInputStream());
+        payload.getContentMetadata().setContentLength(contentLength);
+        return payload;
+    }
+
+    @FallbackConverter
+    public static <T extends Payload> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) throws IOException {
+        Class sourceType = value.getClass();
+        if (GenericFile.class.isAssignableFrom(sourceType)) {
+            GenericFile genericFile = (GenericFile) value;
+            if (genericFile.getFile() != null) {
+                Class genericFileType = genericFile.getFile().getClass();
+                TypeConverter converter = registry.lookup(Payload.class, genericFileType);
+                if (converter != null) {
+                    return (T) converter.convertTo(Payload.class, genericFile.getFile());
+                }
+            }
+        }
+        return null;
+    }
+}

Added: camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=1345633&view=auto
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (added)
+++ camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Sun Jun  3 09:15:00 2012
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.camel.component.jclouds.JcloudsPayloadConverter
\ No newline at end of file

Modified: camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java (original)
+++ camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java Sun Jun  3 09:15:00 2012
@@ -24,6 +24,7 @@ import org.apache.camel.test.junit4.Came
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.BlobStoreContextFactory;
+import org.jclouds.io.payloads.StringPayload;
 import org.junit.Test;
 
 public class JcloudsBlobStoreConsumerTest extends CamelTestSupport {
@@ -32,6 +33,10 @@ public class JcloudsBlobStoreConsumerTes
     private static final String TEST_BLOB1 = "testBlob1";
     private static final String TEST_BLOB2 = "testBlob2";
 
+    private static final String TEST_CONTAINER_WITH_DIR = "testContainerWithDirectories";
+    private static final String TEST_BLOB_IN_DIR = "dir/testBlob";
+    private static final String TEST_BLOB_IN_OTHER = "other/testBlob";
+
     BlobStoreContextFactory contextFactory = new BlobStoreContextFactory();
     BlobStoreContext blobStoreContext = contextFactory.createContext("transient", "identity", "credential");
     BlobStore blobStore = blobStoreContext.getBlobStore();
@@ -39,7 +44,7 @@ public class JcloudsBlobStoreConsumerTes
     @Test
     public void testBlobStoreGetOneBlob() throws InterruptedException {
         String message = "Some message";
-        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, message);
+        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, new StringPayload(message));
 
         MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results", MockEndpoint.class);
         mockEndpoint.expectedMessageCount(1);
@@ -53,30 +58,61 @@ public class JcloudsBlobStoreConsumerTes
     @Test
     public void testBlobStoreGetTwoBlobs() throws InterruptedException {
         String message1 = "Blob 1";
-        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, message1);
+        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, new StringPayload(message1));
 
         String message2 = "Blob 2";
-        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB2, message2);
+        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB2, new StringPayload(message2));
 
         MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results", MockEndpoint.class);
         mockEndpoint.expectedMessageCount(2);
         mockEndpoint.expectedBodiesReceived(message1, message2);
 
         mockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testBlobStoreWithDirectory() throws InterruptedException {
+        String message1 = "Blob in directory";
+        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER_WITH_DIR, TEST_BLOB_IN_DIR, new StringPayload(message1));
 
+        MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results-in-dir", MockEndpoint.class);
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedBodiesReceived(message1);
+
+        mockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testBlobStoreWithMultipleDirectories() throws InterruptedException {
+        String message1 = "Blob in directory";
+        String message2 = "Blob in other directory";
+        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER_WITH_DIR, TEST_BLOB_IN_DIR, new StringPayload(message1));
+        JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER_WITH_DIR, TEST_BLOB_IN_OTHER, new StringPayload(message2));
+
+        MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results-in-dir", MockEndpoint.class);
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedBodiesReceived(message1);
+
+        mockEndpoint.assertIsSatisfied();
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
 
         blobStore.createContainerInLocation(null, TEST_CONTAINER);
+        blobStore.createContainerInLocation(null, TEST_CONTAINER_WITH_DIR);
         ((JcloudsComponent) context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore));
 
         return new RouteBuilder() {
             public void configure() {
 
                 from("jclouds:blobstore:transient?container=" + TEST_CONTAINER)
+                        .convertBodyTo(String.class)
                         .to("mock:results");
+
+                from("jclouds:blobstore:transient?container=" + TEST_CONTAINER_WITH_DIR + "&directory=dir")
+                        .convertBodyTo(String.class)
+                        .to("mock:results-in-dir");
             }
         };
     }

Modified: camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java (original)
+++ camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java Sun Jun  3 09:15:00 2012
@@ -17,19 +17,31 @@
 
 package org.apache.camel.component.jclouds;
 
+import java.io.ByteArrayInputStream;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.sax.SAXSource;
+import org.xml.sax.InputSource;
 import com.google.common.collect.Lists;
+import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.StreamCacheConverter;
+import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.BlobStoreContextFactory;
 import org.junit.Test;
 
+
+
 public class JcloudsBlobStoreProducerTest extends CamelTestSupport {
 
     private static final String TEST_CONTAINER = "testContainer";
     private static final String TEST_BLOB = "testBlob";
+    private static final String TEST_BLOB_IN_DIR = "/dir/testBlob";
+    private static final String MESSAGE = "<test>This is a test</test>";
 
     BlobStoreContextFactory contextFactory = new BlobStoreContextFactory();
     BlobStoreContext blobStoreContext = contextFactory.createContext("transient", "identity", "credential");
@@ -47,26 +59,36 @@ public class JcloudsBlobStoreProducerTes
     public void testBlobStorePutAndGet() throws InterruptedException {
         String message = "Some message";
         template.sendBody("direct:put-and-get", message);
-        Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET);
+        Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET, String.class);
         assertEquals(message, result);
     }
 
+    @Test
+    public void testBlobStorePutWithStreamAndGet() throws InterruptedException, TransformerException {
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(MESSAGE.getBytes());
+        Exchange exchange = new DefaultExchange(context);
+        StreamCache streamCache = StreamCacheConverter.convertToStreamCache(new SAXSource(new InputSource(inputStream)), exchange);
+        template.sendBody("direct:put-and-get", streamCache);
+        Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET, String.class);
+        assertEquals(MESSAGE, result);
+    }
+
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
 
         blobStore.createContainerInLocation(null, TEST_CONTAINER);
-        ((JcloudsComponent)context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore));
+        ((JcloudsComponent) context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore));
 
         return new RouteBuilder() {
             public void configure() {
                 from("direct:put")
-                        .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB))
+                        .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB_IN_DIR))
                         .setHeader(JcloudsConstants.CONTAINER_NAME, constant(TEST_CONTAINER))
                         .to("jclouds:blobstore:transient").to("mock:results");
 
                 from("direct:put-and-get")
-                        .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB))
+                        .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB_IN_DIR))
                         .setHeader(JcloudsConstants.CONTAINER_NAME, constant(TEST_CONTAINER))
                         .to("jclouds:blobstore:transient");
             }

Modified: camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java (original)
+++ camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java Sun Jun  3 09:15:00 2012
@@ -70,8 +70,8 @@ public class BlobStoreBlueprintRouteTest
         MockEndpoint mock = ctx.getEndpoint("mock:results", MockEndpoint.class);
         ProducerTemplate template = ctx.createProducerTemplate();
         mock.expectedMessageCount(2);
-        template.sendBodyAndHeader("direct:start", new SimpleObject("1", "Test 1"), JcloudsConstants.BLOB_NAME, "blob1");
-        template.sendBodyAndHeader("direct:start", new SimpleObject("2", "Test 2"), JcloudsConstants.BLOB_NAME, "blob2");
+        template.sendBodyAndHeader("direct:start", "Test 1", JcloudsConstants.BLOB_NAME, "blob1");
+        template.sendBodyAndHeader("direct:start", "Test 2", JcloudsConstants.BLOB_NAME, "blob2");
         assertMockEndpointsSatisfied();
     }
 
@@ -81,7 +81,6 @@ public class BlobStoreBlueprintRouteTest
                 getDefaultCamelKarafOptions(),
                 //Helper.setLogLevel("INFO"),
                 provision(newBundle()
-                        .add(SimpleObject.class)
                         .add("META-INF/persistence.xml", BlobStoreBlueprintRouteTest.class.getResource("/META-INF/persistence.xml"))
                         .add("OSGI-INF/blueprint/test.xml", BlobStoreBlueprintRouteTest.class.getResource("blueprintCamelContext.xml"))
                         .set(Constants.BUNDLE_SYMBOLICNAME, "CamelBlueprintJcloudsTestBundle")

Modified: camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff
==============================================================================
--- camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java (original)
+++ camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java Sun Jun  3 09:15:00 2012
@@ -63,8 +63,8 @@ public class BlobStoreRouteTest extends 
     public void testProducerAndConsumer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:results");
         mock.expectedMessageCount(2);
-        template.sendBodyAndHeader("direct:start", new SimpleObject("1", "Test 1"), JcloudsConstants.BLOB_NAME, "blob1");
-        template.sendBodyAndHeader("direct:start", new SimpleObject("2", "Test 2"), JcloudsConstants.BLOB_NAME, "blob2");
+        template.sendBodyAndHeader("direct:start", "Test 1", JcloudsConstants.BLOB_NAME, "blob1");
+        template.sendBodyAndHeader("direct:start", "Test 2", JcloudsConstants.BLOB_NAME, "blob2");
         assertMockEndpointsSatisfied();
     }