You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by jayasreeb <ja...@infosys.com> on 2008/10/17 05:26:43 UTC

Error In Sending SOAP With Attachment using Active MQ

Hi, 

My requirement is to place SOAP Message with Attachment(binary data- Java
object) in JMS. I am using fuse-message-broker-5.0.0.17. 

I have a producer program which creates a soap message with
attachment(single attachment) using data handler and places it using
SOAPMessageIntoJMSMessage.When i check the count of Attachment in Producer
class its giving 1 before placing the SOAP Message into JMS Message. 

I have COnsumer class which picks up the same JMS message and displays the
content of soap message. In this class when i check the attachment count its
giving 0. 

Do i need to do any settings in Active MQ/ consumer.java to accept the
attachment? 

I have placed following jars in my workspace. 
1.activation.jar 
2.axis.jar(1.4) 
3.imq.jar 
4.imqxm.jar 
5.saaj.jar 
6.mail.jar 
7.wsdl4j-1.5.1.jar 
8.jaxm-api.jar 
9.jaxrpc.jar 
10.commons-discovery-0.2.jar 
11.commons-logging-1.0.4.jar 


Please find both producer and consumer class details. Please suggest me
solution for this problem. 


Producer.java 
import java.awt.datatransfer.DataFlavor; 
import java.io.File; 
import java.net.URL; 

import javax.xml.parsers.DocumentBuilder; 
import javax.xml.parsers.DocumentBuilderFactory; 
import javax.xml.soap.MessageFactory; 
import javax.activation.DataHandler; 
import javax.activation.DataSource; 
import javax.activation.FileDataSource; 
import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.mail.util.ByteArrayDataSource; 
import javax.xml.soap.AttachmentPart; 
import javax.xml.soap.SOAPBody; 
import javax.xml.soap.SOAPBodyElement; 
import javax.xml.soap.SOAPEnvelope; 
import javax.xml.soap.SOAPMessage; 
import javax.xml.soap.SOAPPart; 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.util.IndentPrinter; 
import org.w3c.dom.Document; 
import com.sun.messaging.xml.MessageTransformer; 

public class Producer { 
        private Destination destination; 
    private int messageCount = 10; 
    private long sleepTime; 
    private boolean verbose = true; 
    private int messageSize = 255; 
    private long timeToLive; 
    private String user = ActiveMQConnection.DEFAULT_USER; 
    private String password = ActiveMQConnection.DEFAULT_PASSWORD; 
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL; 
    private String subject = "TOOL1.DEFAULT"; 
    private boolean topic; 
    private boolean transacted; 
    private boolean persistent; 
    public static void main(String[] args) { 
        Producer producerTool = new Producer(); 
        producerTool.run(); 
    } 
    public void run() { 
        Connection connection = null; 
        try { 
            System.out.println("Connecting to URL: " + url); 
            System.out.println("Publishing a Message with size " +
messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); 
            System.out.println("Using " + (persistent ? "persistent" :
"non-persistent") + " messages"); 
            System.out.println("Sleeping between publish " + sleepTime + "
ms"); 
            if (timeToLive != 0) { 
                System.out.println("Messages time to live " + timeToLive + "
ms"); 
            } 

            // Create the connection. 
         ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url); 
          //  ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://blrkec38454d.ad.infosys.com:61616"); 
            
            connection = connectionFactory.createConnection(); 
            connection.start(); 

            // Create the session 
            Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE); 
            if (topic) { 
                destination = session.createTopic(subject); 
            } else { 
                destination = session.createQueue(subject); 
            } 

            // Create the producer. 
            MessageProducer producer = session.createProducer(destination); 
            if (persistent) { 
                producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
            } else { 
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
            } 
            if (timeToLive != 0) { 
                producer.setTimeToLive(timeToLive); 
            } 

            // Start sending messages 
            sendLoop(session, producer); 

            System.out.println("Done."); 

            // Use the ActiveMQConnection interface to dump the connection 
            // stats. 
            ActiveMQConnection c = (ActiveMQConnection)connection; 
            c.getConnectionStats().dump(new IndentPrinter()); 

        } catch (Exception e) { 
            System.out.println("Caught: " + e); 
            e.printStackTrace(); 
        } finally { 
            try { 
                connection.close(); 
            } catch (Throwable ignore) { 
            } 
        } 
    } 
    protected void sendLoop(Session session, MessageProducer producer)
throws Exception { 

        //for (int i = 0; i < messageCount || messageCount == 0; i++) { 
       try{ 
      System.out.println("in send loop"); 
          

            /*construct a default soap MessageFactory */ 
            MessageFactory mf = MessageFactory.newInstance(); 
            
            /* Create a SOAP message object.*/ 
            SOAPMessage soapMessage = mf.createMessage(); 
            
          // Test with objects 
            
            Person p = new Person(); 
            Address a = new Address(); 
            Address b = new Address(); 
            p.setLname("Jayasree"); 
            p.setMname(""); 
            p.setLname("Balasubramanian"); 
            a.setAdline1("infosys"); 
            a.setAdline2(""); 
            a.setCity("mangalore"); 
            a.setCountry("india"); 
            a.setState(""); 
            p.setAddress(a); 
            


          
            
               DocumentBuilderFactory factory = 
          DocumentBuilderFactory.newInstance(); 
          factory.setNamespaceAware(true); 
        
          DocumentBuilder builder = 
          factory.newDocumentBuilder(); 
          Document document = builder.parse( new File("C:/Address_Std.xml")
); 
        
//            document can be any XML document 
          SOAPBody soapBody = soapMessage.getSOAPBody(); 
          SOAPBodyElement docElement =  soapBody.addDocument(document); 
          
          
          
                    
                    
        //Create an attachment with the Java Framework Activation API 
           URL url = new URL("http://java.sun.com/webservices/"); 
           DataHandler dh = new DataHandler (url); 
           AttachmentPart ap = soapMessage.createAttachmentPart(dh); 

           //Set content type and ID 
           ap.setContentType("text/html"); 
           ap.setContentId("cid-001"); 

           //Add attachment to the SOAP message 
           soapMessage.addAttachmentPart(ap); 
           soapMessage.saveChanges(); 

                  

    // add attachment to message       
          
           soapMessage.saveChanges(); 
            
           soapMessage.writeTo(System.out); 
           System.out.println("inside producer----att
count---"+soapMessage.countAttachments()); 
            Message m =
MessageTransformer.SOAPMessageIntoJMSMessage(soapMessage, session ); 
            System.out.println("Display the SOAP message"+m); 
            producer.send(m); 
            if (transacted) { 
                session.commit(); 
            } 

           // Thread.sleep(sleepTime); 

        
    }catch(Exception e){ 
    System.out.println("exception-->"+e); 
    e.printStackTrace(); 
    } 
    } 
    

    
    public void setPersistent(boolean durable) { 
        this.persistent = durable; 
    } 

    public void setMessageCount(int messageCount) { 
        this.messageCount = messageCount; 
    } 

    public void setMessageSize(int messageSize) { 
        this.messageSize = messageSize; 
    } 

    public void setPassword(String pwd) { 
        this.password = pwd; 
    } 

    public void setSleepTime(long sleepTime) { 
        this.sleepTime = sleepTime; 
    } 

    public void setSubject(String subject) { 
        this.subject = subject; 
    } 

    public void setTimeToLive(long timeToLive) { 
        this.timeToLive = timeToLive; 
    } 

    public void setTopic(boolean topic) { 
        this.topic = topic; 
    } 

    public void setQueue(boolean queue) { 
        this.topic = !queue; 
    } 

    public void setTransacted(boolean transacted) { 
        this.transacted = transacted; 
    } 

    public void setUrl(String url) { 
        this.url = url; 
    } 

    public void setUser(String user) { 
        this.user = user; 
    } 

    public void setVerbose(boolean verbose) { 
        this.verbose = verbose; 
    } 
} 


Consumer.java 

 import java.io.IOException; 
import java.util.Arrays; 

import javax.activation.DataHandler; 
import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.ExceptionListener; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 
import javax.xml.soap.AttachmentPart; 
import javax.xml.soap.MessageFactory; 
import javax.xml.soap.Name; 
import javax.xml.soap.SOAPBody; 
import javax.xml.soap.SOAPBodyElement; 
import javax.xml.soap.SOAPException; 
import javax.xml.soap.SOAPMessage; 
import javax.xml.transform.Source; 
import javax.xml.transform.Transformer; 
import javax.xml.transform.TransformerFactory; 
import javax.xml.transform.stream.StreamResult; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

import com.sun.messaging.xml.MessageTransformer; 

public class Consumer implements MessageListener, ExceptionListener { 
        private boolean running; 

    private Session session; 
    private Destination destination; 
    private MessageProducer replyProducer; 

    private boolean pauseBeforeShutdown; 
    private boolean verbose = true; 
    private int maxiumMessages; 
    private String subject = "TOOL1.DEFAULT"; 
    private boolean topic; 
    private String user = ActiveMQConnection.DEFAULT_USER; 
    private String password = ActiveMQConnection.DEFAULT_PASSWORD; 
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL; 
    private boolean transacted; 
    private boolean durable; 
    private String clientId; 
    private int ackMode = Session.AUTO_ACKNOWLEDGE; 
    private String consumerName = "James"; 
    private long sleepTime; 
    private long receiveTimeOut; 
    Name bodyName; 

    public static void main(String[] args) { 
        Consumer consumerTool = new Consumer(); 
        consumerTool.run(); 
    } 

    public void run() { 
        try { 
            running = true; 

            System.out.println("Connecting to URL: " + url); 
            System.out.println("Consuming " + (topic ? "topic" : "queue") +
": " + subject); 
            System.out.println("Using a " + (durable ? "durable" :
"non-durable") + " subscription"); 

            ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url); 
            Connection connection = connectionFactory.createConnection(); 
            if (durable && clientId != null && clientId.length() > 0 &&
!"null".equals(clientId)) { 
                connection.setClientID(clientId); 
            } 
            connection.setExceptionListener(this); 
            connection.start(); 

            session = connection.createSession(transacted, ackMode); 
            if (topic) { 
                destination = session.createTopic(subject); 
            } else { 
                destination = session.createQueue(subject); 
            } 

            replyProducer = session.createProducer(null); 
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

            MessageConsumer consumer = null; 
            if (durable && topic) { 
                consumer =
session.createDurableSubscriber((Topic)destination, consumerName); 
            } else { 
                consumer = session.createConsumer(destination); 
            } 
            System.out.println("before  and  calling the consume method"); 
            if (maxiumMessages > 0) { 
            System.out.println("Inside if part"); 
                consumeMessagesAndClose(connection, session, consumer); 
            } else { 
                if (receiveTimeOut == 0) { 
                System.out.println("Inside else if part"); 
                    consumer.setMessageListener(this); 
                } else { 
                System.out.println("Inside else part"); 
                    consumeMessagesAndClose(connection, session, consumer,
receiveTimeOut); 
                } 
            } 

        } catch (Exception e) { 
            System.out.println("Caught: " + e); 
            e.printStackTrace(); 
        } 
    } 

    public void onMessage(Message message) { 
        try { 

            MessageFactory messageFactory = MessageFactory.newInstance(); 
            
            SOAPMessage soapMessage = 
             MessageTransformer.SOAPMessageFromJMSMessage(
message,messageFactory ); 
            soapMessage.writeTo(System.out); 
            
 //Extract the content of the reply 
  System.out.println("Attachment count in consumer
---->"+soapMessage.countAttachments()); 
  
  
  

  
  
 java.util.Iterator iterator = soapMessage.getAttachments(); 
  
 while (iterator.hasNext()) { 
        
  
          
     DataHandler dh = ((AttachmentPart)iterator.next()).getDataHandler(); 
    Object obj = dh.getContent(); 
   //  if(null != fname) 
       // return new File(fname); 
    
 } 
  

 Source sourceContent = soapMessage.getSOAPPart().getContent(); 
 //Set the output for the transformation 
  
 StreamResult result = new StreamResult(System.out); 

    TransformerFactory transformerFactory = 
    TransformerFactory.newInstance(); 

Transformer transformer = 
 transformerFactory.newTransformer(); 
 transformer.transform(sourceContent, result); 


            if (message.getJMSReplyTo() != null) { 
            
                replyProducer.send(message.getJMSReplyTo(),
session.createTextMessage("Reply: " + message.getJMSMessageID())); 
            } 

            if (transacted) { 
            
                session.commit(); 
            } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) { 
                message.acknowledge(); 
            } 

        } catch (JMSException e) { 
           /* System.out.println("Caught JMS Error Code: " +
e.getErrorCode()); 
            System.out.println("Caught JMS: " + e.getLocalizedMessage()); 
            System.out.println("Caught JMS: " + e.getMessage());*/ 
            e.printStackTrace(); 
        } 
        catch (Exception e) { 
        
             System.out.println("Caught JMS: " + e.getMessage()); 
        e.printStackTrace(); 
        
        } 
        finally { 
        } 
            if (sleepTime > 0) { 
                try { 
                    Thread.sleep(sleepTime); 
                } catch (InterruptedException e) { 
                } 
            } 
        } 
    

    public synchronized void onException(JMSException ex) { 
        System.out.println("JMS Exception occured.  Shutting down client."); 
        running = false; 
    } 

    synchronized boolean isRunning() { 
        return running; 
    } 

    protected void consumeMessagesAndClose(Connection connection, Session
session, MessageConsumer consumer) throws JMSException, IOException { 
        System.out.println("We are about to wait until we consume: " +
maxiumMessages + " message(s) then we will shutdown"); 
        System.out.println("Inside method part"); 
        for (int i = 0; i < maxiumMessages && isRunning();) { 
            Message message = consumer.receive(1000); 
            if (message != null) { 
                i++; 
                onMessage(message); 
            } 
        } 
        System.out.println("Closing connection"); 
        consumer.close(); 
        session.close(); 
        connection.close(); 
        if (pauseBeforeShutdown) { 
            System.out.println("Press return to shut down"); 
            System.in.read(); 
        } 
    } 

    protected void consumeMessagesAndClose(Connection connection, Session
session, MessageConsumer consumer, long timeout) throws JMSException,
IOException { 
        System.out.println("We will consume messages while they continue to
be delivered within: " + timeout + " ms, and then we will shutdown"); 

        Message message; 
        while ((message = consumer.receive(timeout)) != null) { 
            onMessage(message); 
        } 

        System.out.println("Closing connection"); 
        consumer.close(); 
        session.close(); 
        connection.close(); 
        if (pauseBeforeShutdown) { 
            System.out.println("Press return to shut down"); 
            System.in.read(); 
        } 
    } 

    public void setAckMode(String ackMode) { 
        if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) { 
            this.ackMode = Session.CLIENT_ACKNOWLEDGE; 
        } 
        if ("AUTO_ACKNOWLEDGE".equals(ackMode)) { 
            this.ackMode = Session.AUTO_ACKNOWLEDGE; 
        } 
        if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) { 
            this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; 
        } 
        if ("SESSION_TRANSACTED".equals(ackMode)) { 
            this.ackMode = Session.SESSION_TRANSACTED; 
        } 
    } 

    public void setClientId(String clientID) { 
        this.clientId = clientID; 
    } 

    public void setConsumerName(String consumerName) { 
        this.consumerName = consumerName; 
    } 

    public void setDurable(boolean durable) { 
        this.durable = durable; 
    } 

    public void setMaxiumMessages(int maxiumMessages) { 
        this.maxiumMessages = maxiumMessages; 
    } 

    public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) { 
        this.pauseBeforeShutdown = pauseBeforeShutdown; 
    } 

    public void setPassword(String pwd) { 
        this.password = pwd; 
    } 

    public void setReceiveTimeOut(long receiveTimeOut) { 
        this.receiveTimeOut = receiveTimeOut; 
    } 

    public void setSleepTime(long sleepTime) { 
        this.sleepTime = sleepTime; 
    } 

    public void setSubject(String subject) { 
        this.subject = subject; 
    } 

    public void setTopic(boolean topic) { 
        this.topic = topic; 
    } 

    public void setQueue(boolean queue) { 
        this.topic = !queue; 
    } 

    public void setTransacted(boolean transacted) { 
        this.transacted = transacted; 
    } 

    public void setUrl(String url) { 
        this.url = url; 
    } 

    public void setUser(String user) { 
        this.user = user; 
    } 

    public void setVerbose(boolean verbose) { 
        this.verbose = verbose; 
    } 

} 




-- 
View this message in context: http://www.nabble.com/Error-In-Sending-SOAP-With-Attachment-using-Active-MQ-tp20026508p20026508.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.