You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/11/25 10:46:27 UTC
svn commit: r1038971 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/blob/BlobTransferPolicy.java
main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java
test/java/org/apache/activemq/blob/FilesystemBlobTest.java
Author: dejanb
Date: Thu Nov 25 09:46:27 2010
New Revision: 1038971
URL: http://svn.apache.org/viewvc?rev=1038971&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3017 - fllesystem blob strategy
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=1038971&r1=1038970&r2=1038971&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java Thu Nov 25 09:46:27 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.blob;
import java.net.MalformedURLException;
+import java.net.URISyntaxException;
import java.net.URL;
/**
@@ -132,11 +133,15 @@ public class BlobTransferPolicy {
if(url.getProtocol().equalsIgnoreCase("FTP")) {
strategy = new FTPBlobUploadStrategy(this);
+ } else if (url.getProtocol().equalsIgnoreCase("FILE")) {
+ strategy = new FileSystemBlobStrategy(this);
} else {
strategy = new DefaultBlobUploadStrategy(this);
}
} catch (MalformedURLException e) {
- strategy = new DefaultBlobUploadStrategy(this);
+ strategy = new DefaultBlobUploadStrategy(this);
+ } catch (URISyntaxException e) {
+ strategy = new DefaultBlobUploadStrategy(this);
}
return strategy;
}
@@ -154,11 +159,15 @@ public class BlobTransferPolicy {
if(url.getProtocol().equalsIgnoreCase("FTP")) {
strategy = new FTPBlobDownloadStrategy(this);
+ } else if (url.getProtocol().equalsIgnoreCase("FILE")) {
+ strategy = new FileSystemBlobStrategy(this);
} else {
strategy = new DefaultBlobDownloadStrategy(this);
}
} catch (MalformedURLException e) {
strategy = new DefaultBlobDownloadStrategy(this);
+ } catch (URISyntaxException e) {
+ strategy = new DefaultBlobDownloadStrategy(this);
}
return strategy;
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java?rev=1038971&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FileSystemBlobStrategy.java Thu Nov 25 09:46:27 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * {@link BlobUploadStrategy} and {@link BlobDownloadStrategy} implementation which use the local filesystem for storing
+ * the payload
+ *
+ */
+public class FileSystemBlobStrategy implements BlobUploadStrategy, BlobDownloadStrategy{
+
+
+ private final BlobTransferPolicy policy;
+ private File rootFile;
+
+ public FileSystemBlobStrategy(final BlobTransferPolicy policy) throws MalformedURLException, URISyntaxException {
+ this.policy = policy;
+
+ createRootFolder();
+ }
+
+ /**
+ * Create the root folder if not exist
+ *
+ * @throws MalformedURLException
+ * @throws URISyntaxException
+ */
+ protected void createRootFolder() throws MalformedURLException, URISyntaxException {
+ rootFile = new File(new URL(policy.getUploadUrl()).toURI());
+ if (rootFile.exists() == false) {
+ rootFile.mkdirs();
+ } else if (rootFile.isDirectory() == false) {
+ throw new IllegalArgumentException("Given url is not a directory " + rootFile );
+ }
+ }
+ /*
+ * (non-Javadoc)
+ * @see org.apache.activemq.blob.BlobUploadStrategy#uploadFile(org.apache.activemq.command.ActiveMQBlobMessage, java.io.File)
+ */
+ public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
+ return uploadStream(message, new FileInputStream(file));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.activemq.blob.BlobUploadStrategy#uploadStream(org.apache.activemq.command.ActiveMQBlobMessage, java.io.InputStream)
+ */
+ public URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException {
+ File f = getFile(message);
+ FileOutputStream out = new FileOutputStream(f);
+ byte[] buffer = new byte[policy.getBufferSize()];
+ for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
+ out.write(buffer, 0, c);
+ out.flush();
+ }
+ out.flush();
+ out.close();
+ // File.toURL() is deprecated
+ return f.toURI().toURL();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.activemq.blob.BlobDownloadStrategy#deleteFile(org.apache.activemq.command.ActiveMQBlobMessage)
+ */
+ public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
+ File f = getFile(message);
+ if (f.exists()) {
+ if (f.delete() == false) throw new IOException("Unable to delete file " + f);
+ }
+ }
+
+ /**
+ * Returns a {@link FileInputStream} for the give {@link ActiveMQBlobMessage}
+ */
+ public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
+ return new FileInputStream(getFile(message));
+ }
+
+
+ /**
+ * Return the {@link File} for the {@link ActiveMQBlobMessage}.
+ *
+ * @param message
+ * @return file
+ * @throws JMSException
+ * @throws IOException
+ */
+ protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException {
+ if (message.getURL() != null) {
+ try {
+ return new File(message.getURL().toURI());
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to open file for message " + message ,e);
+ }
+ }
+ //replace all : with _ to make windows more happy
+ String fileName = message.getJMSMessageID().replaceAll(":", "_");
+ return new File(rootFile, fileName);
+
+ }
+}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java?rev=1038971&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java Thu Nov 25 09:46:27 2010
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.util.IOHelper;
+
+
+public class FilesystemBlobTest extends EmbeddedBrokerTestSupport {
+
+
+ private Connection connection;
+ private String tmpDir = System.getProperty("user.dir") + "/target/FilesystemBlobTest";
+ public void setUp() throws Exception {
+ super.setUp();
+ // replace \ with / to let it work on windows too
+ String fileUrl = "file:///" +tmpDir.replaceAll("\\\\", "/");
+ bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + fileUrl;
+
+ connectionFactory = createConnectionFactory();
+
+ connection = createConnection();
+ connection.start();
+ }
+
+
+ public void testBlobFile() throws Exception {
+ // first create Message
+ File file = File.createTempFile("amq-data-file-", ".dat");
+ // lets write some data
+ String content = "hello world " + System.currentTimeMillis();
+ BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+ writer.append(content);
+ writer.close();
+
+ ActiveMQSession session = (ActiveMQSession) connection.createSession(
+ false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
+ BlobMessage message = session.createBlobMessage(file);
+
+ producer.send(message);
+ Thread.sleep(1000);
+
+ // check message send
+ Message msg = consumer.receive(1000);
+ Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
+
+ InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+ StringBuilder b = new StringBuilder();
+ int i = input.read();
+ while (i != -1) {
+ b.append((char) i);
+ i = input.read();
+ }
+ input.close();
+ File uploaded = new File(tmpDir, msg.getJMSMessageID().toString().replace(":", "_"));
+ Assert.assertEquals(content, b.toString());
+ assertTrue(uploaded.exists());
+ ((ActiveMQBlobMessage)msg).deleteFile();
+ assertFalse(uploaded.exists());
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.stop();
+ }
+ super.tearDown();
+
+ IOHelper.deleteFile(new File(tmpDir));
+ }
+}
\ No newline at end of file