You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/11/25 10:46:27 UTC

svn commit: r1038971 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/blob/BlobTransferPolicy.java main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java test/java/org/apache/activemq/blob/FilesystemBlobTest.java

Author: dejanb
Date: Thu Nov 25 09:46:27 2010
New Revision: 1038971

URL: http://svn.apache.org/viewvc?rev=1038971&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3017 - fllesystem blob strategy

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=1038971&r1=1038970&r2=1038971&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java Thu Nov 25 09:46:27 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.blob;
 
 import java.net.MalformedURLException;
+import java.net.URISyntaxException;
 import java.net.URL;
 
 /**
@@ -132,11 +133,15 @@ public class BlobTransferPolicy {
 
             if(url.getProtocol().equalsIgnoreCase("FTP")) {
                 strategy = new FTPBlobUploadStrategy(this);
+            } else if (url.getProtocol().equalsIgnoreCase("FILE")) {
+                strategy = new FileSystemBlobStrategy(this);
             } else {
                 strategy = new DefaultBlobUploadStrategy(this);
             }
         } catch (MalformedURLException e) {
-                strategy = new DefaultBlobUploadStrategy(this);
+            strategy = new DefaultBlobUploadStrategy(this);
+        } catch (URISyntaxException e) {
+            strategy = new DefaultBlobUploadStrategy(this);
         }
         return strategy;
     }
@@ -154,11 +159,15 @@ public class BlobTransferPolicy {
             
             if(url.getProtocol().equalsIgnoreCase("FTP")) {
                 strategy = new FTPBlobDownloadStrategy(this);
+            } else if (url.getProtocol().equalsIgnoreCase("FILE")) {
+                strategy = new FileSystemBlobStrategy(this);
             } else {
                 strategy = new DefaultBlobDownloadStrategy(this);
             }
         } catch (MalformedURLException e) {
             strategy = new DefaultBlobDownloadStrategy(this);
+        } catch (URISyntaxException e) {
+            strategy = new DefaultBlobDownloadStrategy(this);
         }
         return strategy;
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java?rev=1038971&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java Thu Nov 25 09:46:27 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * {@link BlobUploadStrategy} and {@link BlobDownloadStrategy} implementation which use the local filesystem for storing
+ * the payload
+ *
+ */
+public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadStrategy{
+
+   
+    private final BlobTransferPolicy policy;
+    private File rootFile;
+    
+    public FileSystemBlobStrategy(final BlobTransferPolicy policy) throws MalformedURLException, URISyntaxException  {
+        this.policy = policy;
+        
+        createRootFolder();
+    }
+    
+    /**
+     * Create the root folder if not exist 
+     * 
+     * @throws MalformedURLException
+     * @throws URISyntaxException
+     */
+    protected void createRootFolder() throws MalformedURLException, URISyntaxException {
+        rootFile = new File(new URL(policy.getUploadUrl()).toURI());
+        if (rootFile.exists() == false) {
+            rootFile.mkdirs();
+        } else if (rootFile.isDirectory() == false) {
+            throw new IllegalArgumentException("Given url is not a directory " + rootFile );
+        }
+    }
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.blob.BlobUploadStrategy#uploadFile(org.apache.activemq.command.ActiveMQBlobMessage, java.io.File)
+     */
+    public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
+        return uploadStream(message, new FileInputStream(file));
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.blob.BlobUploadStrategy#uploadStream(org.apache.activemq.command.ActiveMQBlobMessage, java.io.InputStream)
+     */
+    public URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException {
+        File f = getFile(message);
+        FileOutputStream out = new FileOutputStream(f);
+        byte[] buffer = new byte[policy.getBufferSize()];
+        for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
+            out.write(buffer, 0, c);
+            out.flush();
+        }
+        out.flush();
+        out.close();
+        // File.toURL() is deprecated
+        return f.toURI().toURL();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.blob.BlobDownloadStrategy#deleteFile(org.apache.activemq.command.ActiveMQBlobMessage)
+     */
+    public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
+        File f = getFile(message);
+        if (f.exists()) {
+            if (f.delete() == false) throw new IOException("Unable to delete file " + f);
+        }
+    }
+
+    /**
+     * Returns a {@link FileInputStream} for the give {@link ActiveMQBlobMessage}
+     */
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
+        return new FileInputStream(getFile(message));
+    }
+
+    
+    /**
+     * Return the {@link File} for the {@link ActiveMQBlobMessage}. 
+     * 
+     * @param message
+     * @return file
+     * @throws JMSException
+     * @throws IOException 
+     */
+    protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException {
+    	if (message.getURL() != null) {
+    		try {
+				return new File(message.getURL().toURI());
+			} catch (URISyntaxException e) {
+				throw new IOException("Unable to open file for message " + message ,e);
+			}
+    	}
+        //replace all : with _ to make windows more happy
+        String fileName = message.getJMSMessageID().replaceAll(":", "_"); 
+        return new File(rootFile, fileName);        
+        
+    }
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java?rev=1038971&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java Thu Nov 25 09:46:27 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.util.IOHelper;
+
+
+public class FilesystemBlobTest extends EmbeddedBrokerTestSupport {
+
+    
+    private Connection connection;
+    private String tmpDir =  System.getProperty("user.dir") + "/target/FilesystemBlobTest";
+	public void setUp() throws Exception {
+        super.setUp();
+        // replace \ with / to let it work on windows too
+        String fileUrl = "file:///" +tmpDir.replaceAll("\\\\", "/");
+        bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + fileUrl;
+        
+        connectionFactory = createConnectionFactory();
+        
+        connection = createConnection();
+        connection.start();        
+    }
+    
+    
+    public void testBlobFile() throws Exception {
+        // first create Message
+        File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        String content = "hello world " + System.currentTimeMillis();
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append(content);
+        writer.close();
+
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(
+                false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        BlobMessage message = session.createBlobMessage(file);
+
+        producer.send(message);
+        Thread.sleep(1000);
+
+        // check message send
+        Message msg = consumer.receive(1000);
+        Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
+
+        InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+        StringBuilder b = new StringBuilder();
+        int i = input.read();
+        while (i != -1) {
+            b.append((char) i);
+            i = input.read();
+        }
+        input.close();
+        File uploaded = new File(tmpDir, msg.getJMSMessageID().toString().replace(":", "_")); 
+        Assert.assertEquals(content, b.toString());
+        assertTrue(uploaded.exists());
+        ((ActiveMQBlobMessage)msg).deleteFile();
+        assertFalse(uploaded.exists());
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.stop();
+        }
+        super.tearDown();
+       
+        IOHelper.deleteFile(new File(tmpDir));
+    }
+}
\ No newline at end of file