You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by luodaidong <lu...@sina.com> on 2013/05/23 05:51:23 UTC

about blobmessage

I use activeMQ blobmessage tranfering large file. It's OK, but the receiver
can receive file only when the sender has sended the file completly. How
about realtime transfer large file? where can I configure this?
the test code following,where is wrong?

the sender

package mytest.file;  
  
import java.io.File;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.swing.JFileChooser;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
  
/** 
 * 通过 ActiveMQ 发送文件的程序 
 *  
 * @author hailiang 
 */  
public class FileSender {  
  
    /** 
     * @param args 
     * @throws JMSException 
     */  
    public static void main(String[] args) throws JMSException {  
		System.out.println("1111");
        // 选择文件   
        JFileChooser fileChooser = new JFileChooser();  
        fileChooser.setDialogTitle("选择文件");  
        if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION)
{  
            return;  
        }  
        File file = fileChooser.getSelectedFile();  
  
        // 获取 ConnectionFactory   
        ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("admin","admin" , 
               
"tcp://192.168.17.4:61616?jms.blobTransferPolicy.defaultUploadUrl=http://admin:admin@192.168.17.4:8161/fileserver/");  
  
        // 创建 Connection   
        Connection connection = connectionFactory.createConnection();  
        connection.start();  
  
        // 创建 Session   
        ActiveMQSession session = (ActiveMQSession)
connection.createSession(  
                false, Session.AUTO_ACKNOWLEDGE);  
  
        // 创建 Destination   
        Destination destination = session.createQueue("File.Transport");  
  
        // 创建 Producer   
        MessageProducer producer = session.createProducer(destination);  
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 设置为非持久性   
        // 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件   
  
        // 构造 BlobMessage,用来传输文件   
        BlobMessage blobMessage = session.createBlobMessage(file);  
        blobMessage.setStringProperty("FILE.NAME", file.getName());  
        blobMessage.setLongProperty("FILE.SIZE", file.length());  
        System.out.println("开始发送文件:" + file.getName() + ",文件大小:"  
                + file.length() + " 字节");  
  
        // 7. 发送文件   
        producer.send(blobMessage);  
        System.out.println("完成文件发送:" + file.getName());  
  
        producer.close();  
        session.close();  
        connection.close(); // 不关闭 Connection, 程序则不退出   
    }  
}  


the receiver
package mytest.file;    
import java.io.*;  
import javax.jms.*;  
import javax.jms.Message;  
import javax.swing.*;  
import org.apache.activemq.*;    

  
public class FileReciever {  
  
    /** 
     * @param args 
     * @throws JMSException 
     */  
    public static void main(String[] args) throws JMSException {  
  
        // 获取 ConnectionFactory   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
                "tcp://192.168.17.4:61616");  
  
        // 创建 Connection   
        Connection connection = connectionFactory.createConnection();  
        connection.start();  
  
        // 创建 Session   
        Session session = connection.createSession(false,  
                Session.AUTO_ACKNOWLEDGE);  
  
        // 创建 Destinatione   
        Destination destination = session.createQueue("File.Transport");  
  
        // 创建 Consumer   
        MessageConsumer consumer = session.createConsumer(destination);  
  
        // 注册消息监听器,当消息到达时被触发并处理消息   
        consumer.setMessageListener(new MessageListener() {  
  
            // 监听器中处理消息   
            public void onMessage(Message message) {  
                if (message instanceof BlobMessage) {  
                    BlobMessage blobMessage = (BlobMessage) message;  
                    try {  
                        String fileName = blobMessage  
                                .getStringProperty("FILE.NAME");  
                        System.out.println("文件接收请求处理:" + fileName + ",文件大小:"  
                                + blobMessage.getLongProperty("FILE.SIZE")  
                                + " 字节");  
  
                        JFileChooser fileChooser = new JFileChooser();  
                        fileChooser.setDialogTitle("请指定文件保存位置");  
                        fileChooser.setSelectedFile(new File(fileName));  
                        if (fileChooser.showSaveDialog(null) ==
JFileChooser.APPROVE_OPTION) {  
                            File file = fileChooser.getSelectedFile();  
                            OutputStream os = new FileOutputStream(file);  
  
                            System.out.println("开始接收文件:" + fileName);  
                            InputStream inputStream = blobMessage  
                                    .getInputStream();  
  
                            // 写文件,你也可以使用其他方式   
                            byte[] buff = new byte[256];  
                            int len = 0;  
                            while ((len = inputStream.read(buff)) > 0) {  
                                os.write(buff, 0, len);  
                            }  
                            os.close();  
                            System.out.println("完成文件接收:" + fileName);  
                        }  
  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
        });  
    }  
}  




--
View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re:Re: Re:Re: Re:about blobmessage

Posted by SuoNayi <su...@163.com>.
Can you detail how you define the realtime transfer file?
As far as I know. OICQ does not support realtime transfer as well.
For BlobMessage AMQ employs the third-party file service to upload the file at first and 
then send a event to the receiver to be known that there'is a file need be downloaded.
For JMS Streams,the file will be split into pieces and sent piece by piece so the receiver 
can stream these pieces by receiving them one by one.



At 2013-05-24 17:58:36,luodaidong <lu...@sina.com> wrote:
>If ActiveMQ BlobMessage support realtime transfer file, this can help for IM
>like QQ.
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381p4667453.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Re:Re: Re:about blobmessage

Posted by luodaidong <lu...@sina.com>.
If ActiveMQ BlobMessage support realtime transfer file, this can help for IM
like QQ.



--
View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381p4667453.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Re:Re: Re:about blobmessage

Posted by Christian Posta <ch...@gmail.com>.
Why does this need to be a messaging solution?


On Thu, May 23, 2013 at 12:47 AM, luodaidong <lu...@sina.com> wrote:

> I am newer for ActiveMQ, I use following code for streaming file.but every
> time the receiver just receive 768K, and then stop there. But the sender is
> still send data. I don't know why? I also saw someone discuss large file
> with ActiveMQ, He said use BlobMessage was better than Streams,But
> BlobMessage why isn't support realtime transfer? Realtime transfer for
> large
> file maybe better in P2P .
>
> sender
>
> package mytest.file;
>
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import java.io.FileInputStream;
> import java.io.FileNotFoundException;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.JMSException;
>
> /**
>  * 通过 ActiveMQ 发送文件的程序
>  *
>  * @author hailiang
>  */
> public class StreamSender {
>
>         /**
>          * @param args
>          * @throws JMSException
>          */
>         public static void main(String[] args) throws JMSException {
>                 FileInputStream in;
>                 try {
>                         in = new
> FileInputStream("d:\\freeswitch-1.0.7.tar.gz");
>                         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(
>
> "tcp://
> 192.168.17.4:61616?jms.blobTransferPolicy.defaultUploadUrl=http://admin:admin@192.168.17.4:8161/fileserver/
> ");
>                         ActiveMQConnection connection =
> (ActiveMQConnection) connectionFactory
>                                         .createConnection();
>                         connection.start();
>                         Session session = connection.createSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>                         Queue destination =
> session.createQueue("stream.file");
>                         OutputStream out =
> connection.createOutputStream(destination);
>
>                         // now write the file on to ActiveMQ
>                         byte[] buffer = new byte[1024];
>                         while (true) {
>                                 int bytesRead = in.read(buffer);
>                                 if (bytesRead == -1) {
>                                         break;
>                                 }
>                                 System.out.println("sender\r\n");
>                                 out.write(buffer, 0, bytesRead);
>                         }
>                         out.close();
>                 } catch (FileNotFoundException e) {
>                         e.printStackTrace();
>                 } catch (IOException e) {
>                         // TODO Auto-generated catch block
>                         e.printStackTrace();
>                 }
>
>         }
> }
>
>
> receiver
>
> package mytest.file;
>
> import org.apache.activemq.ActiveMQConnection;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.BlobMessage;
>
> import java.io.File;
> import java.io.FileInputStream;
> import java.io.FileNotFoundException;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.swing.JFileChooser;
>
> public class StreamReceiver {
>
>     /**
>      * @param args
>      * @throws JMSException
>      */
>     public static void main(String[] args) throws JMSException {
>
>
>                 FileOutputStream out;
>                         try {
>                                 out = new
> FileOutputStream("e:\\tt\\sfreeswitch-1.0.7.tar.gz");
>                                 ActiveMQConnectionFactory
> connectionFactory = new
> ActiveMQConnectionFactory("tcp://192.168.17.4:61616");
>                         ActiveMQConnection connection =
> (ActiveMQConnection)
> connectionFactory.createConnection();
>                         connection.start();
>                         Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                         //we want be be an exclusive consumer
>                         String exclusiveQueueName= "stream.file";
>                         Queue destination =
> session.createQueue(exclusiveQueueName);
>
>                         InputStream in =
> connection.createInputStream(destination);
>
>                         //now write the file from ActiveMQ
>                         byte[] buffer = new byte[1024];
>                         while(true){
>                             int bytesRead;
>                                                 bytesRead =
> in.read(buffer);
>                                                  if (bytesRead==-1){
>
>  System.out.println("receiver\r\n");
>                                                 break;
>                                             }
>                                                  if(bytesRead>0)
>                                             out.write(buffer,0,bytesRead);
>
>                                         }
>                                         out.close();
>                         } catch (FileNotFoundException e) {
>                                         e.printStackTrace();
>                                 } catch (IOException e) {
>                                         // TODO Auto-generated catch block
>                                         e.printStackTrace();
>                                 }
>     }
> }
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381p4667386.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Re: Re:Re: Re:about blobmessage

Posted by luodaidong <lu...@sina.com>.
I am newer for ActiveMQ, I use following code for streaming file.but every
time the receiver just receive 768K, and then stop there. But the sender is
still send data. I don't know why? I also saw someone discuss large file
with ActiveMQ, He said use BlobMessage was better than Streams,But
BlobMessage why isn't support realtime transfer? Realtime transfer for large
file maybe better in P2P .

sender

package mytest.file;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.JMSException;

/**
 * 通过 ActiveMQ 发送文件的程序
 * 
 * @author hailiang
 */
public class StreamSender {

	/**
	 * @param args
	 * @throws JMSException
	 */
	public static void main(String[] args) throws JMSException {
		FileInputStream in;
		try {
			in = new FileInputStream("d:\\freeswitch-1.0.7.tar.gz");
			ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
				
"tcp://192.168.17.4:61616?jms.blobTransferPolicy.defaultUploadUrl=http://admin:admin@192.168.17.4:8161/fileserver/");
			ActiveMQConnection connection = (ActiveMQConnection) connectionFactory
					.createConnection();
			connection.start();
			Session session = connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Queue destination = session.createQueue("stream.file");
			OutputStream out = connection.createOutputStream(destination);

			// now write the file on to ActiveMQ
			byte[] buffer = new byte[1024];
			while (true) {
				int bytesRead = in.read(buffer);
				if (bytesRead == -1) {
					break;
				}
				System.out.println("sender\r\n");
				out.write(buffer, 0, bytesRead);
			}
			out.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}
}


receiver

package mytest.file;  

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.swing.JFileChooser;

public class StreamReceiver {  
	  
    /** 
     * @param args 
     * @throws JMSException 
     */  
    public static void main(String[] args) throws JMSException {  
  
  
        	FileOutputStream out;
			try {
				out = new FileOutputStream("e:\\tt\\sfreeswitch-1.0.7.tar.gz");
				ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://192.168.17.4:61616");
		        ActiveMQConnection connection = (ActiveMQConnection)
connectionFactory.createConnection();
		        connection.start();
		        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		        //we want be be an exclusive consumer
		        String exclusiveQueueName= "stream.file";
		        Queue destination = session.createQueue(exclusiveQueueName);
		        
		        InputStream in = connection.createInputStream(destination);
		        
		        //now write the file from ActiveMQ
		        byte[] buffer = new byte[1024];
		        while(true){
		            int bytesRead;
						bytesRead = in.read(buffer);
						 if (bytesRead==-1){
							 System.out.println("receiver\r\n");
				                break;
				            }
						 if(bytesRead>0)
				            out.write(buffer,0,bytesRead);
				            
				        }
				        out.close();
		        } catch (FileNotFoundException e) {
					e.printStackTrace();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
    }  
}  





--
View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381p4667386.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re:Re: Re:about blobmessage

Posted by SuoNayi <su...@163.com>.
I do not think when using JMS Streams consumers can get stuck/hung is the expected behavior.
When the issue happens you may post the thread stack of your application so someone can help you.




At 2013-05-23 14:54:54,luodaidong <lu...@sina.com> wrote:
>but stream is not easy for using,sometime the receiver may hang 
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381p4667384.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Re:about blobmessage

Posted by luodaidong <lu...@sina.com>.
but stream is not easy for using,sometime the receiver may hang 



--
View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381p4667384.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re:about blobmessage

Posted by SuoNayi <su...@163.com>.
try out the JMS Streams,http://activemq.apache.org/jms-streams.html



At 2013-05-23 11:51:23,luodaidong <lu...@sina.com> wrote:
>I use activeMQ blobmessage tranfering large file. It's OK, but the receiver
>can receive file only when the sender has sended the file completly. How
>about realtime transfer large file? where can I configure this?
>the test code following,where is wrong?
>
>the sender
>
>package mytest.file;  
>  
>import java.io.File;
>
>import javax.jms.Connection;
>import javax.jms.ConnectionFactory;
>import javax.jms.DeliveryMode;
>import javax.jms.Destination;
>import javax.jms.JMSException;
>import javax.jms.MessageProducer;
>import javax.jms.Session;
>import javax.swing.JFileChooser;
>
>import org.apache.activemq.ActiveMQConnectionFactory;
>import org.apache.activemq.ActiveMQSession;
>import org.apache.activemq.BlobMessage;
>  
>/** 
> * 通过 ActiveMQ 发送文件的程序 
> *  
> * @author hailiang 
> */  
>public class FileSender {  
>  
>    /** 
>     * @param args 
>     * @throws JMSException 
>     */  
>    public static void main(String[] args) throws JMSException {  
>		System.out.println("1111");
>        // 选择文件   
>        JFileChooser fileChooser = new JFileChooser();  
>        fileChooser.setDialogTitle("选择文件");  
>        if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION)
>{  
>            return;  
>        }  
>        File file = fileChooser.getSelectedFile();  
>  
>        // 获取 ConnectionFactory   
>        ConnectionFactory connectionFactory = new
>ActiveMQConnectionFactory("admin","admin" , 
>               
>"tcp://192.168.17.4:61616?jms.blobTransferPolicy.defaultUploadUrl=http://admin:admin@192.168.17.4:8161/fileserver/");  
>  
>        // 创建 Connection   
>        Connection connection = connectionFactory.createConnection();  
>        connection.start();  
>  
>        // 创建 Session   
>        ActiveMQSession session = (ActiveMQSession)
>connection.createSession(  
>                false, Session.AUTO_ACKNOWLEDGE);  
>  
>        // 创建 Destination   
>        Destination destination = session.createQueue("File.Transport");  
>  
>        // 创建 Producer   
>        MessageProducer producer = session.createProducer(destination);  
>        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 设置为非持久性   
>        // 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件   
>  
>        // 构造 BlobMessage,用来传输文件   
>        BlobMessage blobMessage = session.createBlobMessage(file);  
>        blobMessage.setStringProperty("FILE.NAME", file.getName());  
>        blobMessage.setLongProperty("FILE.SIZE", file.length());  
>        System.out.println("开始发送文件:" + file.getName() + ",文件大小:"  
>                + file.length() + " 字节");  
>  
>        // 7. 发送文件   
>        producer.send(blobMessage);  
>        System.out.println("完成文件发送:" + file.getName());  
>  
>        producer.close();  
>        session.close();  
>        connection.close(); // 不关闭 Connection, 程序则不退出   
>    }  
>}  
>
>
>the receiver
>package mytest.file;    
>import java.io.*;  
>import javax.jms.*;  
>import javax.jms.Message;  
>import javax.swing.*;  
>import org.apache.activemq.*;    
>
>  
>public class FileReciever {  
>  
>    /** 
>     * @param args 
>     * @throws JMSException 
>     */  
>    public static void main(String[] args) throws JMSException {  
>  
>        // 获取 ConnectionFactory   
>        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
>                "tcp://192.168.17.4:61616");  
>  
>        // 创建 Connection   
>        Connection connection = connectionFactory.createConnection();  
>        connection.start();  
>  
>        // 创建 Session   
>        Session session = connection.createSession(false,  
>                Session.AUTO_ACKNOWLEDGE);  
>  
>        // 创建 Destinatione   
>        Destination destination = session.createQueue("File.Transport");  
>  
>        // 创建 Consumer   
>        MessageConsumer consumer = session.createConsumer(destination);  
>  
>        // 注册消息监听器,当消息到达时被触发并处理消息   
>        consumer.setMessageListener(new MessageListener() {  
>  
>            // 监听器中处理消息   
>            public void onMessage(Message message) {  
>                if (message instanceof BlobMessage) {  
>                    BlobMessage blobMessage = (BlobMessage) message;  
>                    try {  
>                        String fileName = blobMessage  
>                                .getStringProperty("FILE.NAME");  
>                        System.out.println("文件接收请求处理:" + fileName + ",文件大小:"  
>                                + blobMessage.getLongProperty("FILE.SIZE")  
>                                + " 字节");  
>  
>                        JFileChooser fileChooser = new JFileChooser();  
>                        fileChooser.setDialogTitle("请指定文件保存位置");  
>                        fileChooser.setSelectedFile(new File(fileName));  
>                        if (fileChooser.showSaveDialog(null) ==
>JFileChooser.APPROVE_OPTION) {  
>                            File file = fileChooser.getSelectedFile();  
>                            OutputStream os = new FileOutputStream(file);  
>  
>                            System.out.println("开始接收文件:" + fileName);  
>                            InputStream inputStream = blobMessage  
>                                    .getInputStream();  
>  
>                            // 写文件,你也可以使用其他方式   
>                            byte[] buff = new byte[256];  
>                            int len = 0;  
>                            while ((len = inputStream.read(buff)) > 0) {  
>                                os.write(buff, 0, len);  
>                            }  
>                            os.close();  
>                            System.out.println("完成文件接收:" + fileName);  
>                        }  
>  
>                    } catch (Exception e) {  
>                        e.printStackTrace();  
>                    }  
>                }  
>            }  
>        });  
>    }  
>}  
>
>
>
>
>--
>View this message in context: http://activemq.2283324.n4.nabble.com/about-blobmessage-tp4667381.html
>Sent from the ActiveMQ - User mailing list archive at Nabble.com.