You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/14 22:19:52 UTC

svn commit: r377835 - /incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java

Author: gnodet
Date: Tue Feb 14 13:19:50 2006
New Revision: 377835

URL: http://svn.apache.org/viewcvs?rev=377835&view=rev
Log:
Add logging of messages sent / accepted.
Remove tabs

Modified:
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=377835&r1=377834&r2=377835&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Tue Feb 14 13:19:50 2006
@@ -15,8 +15,22 @@
  */
 package org.apache.servicemix.jbi.messaging;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jbi.component.Component;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+import javax.xml.transform.dom.DOMSource;
 
 import org.apache.activemq.util.IdGenerator;
 import org.apache.commons.logging.Log;
@@ -29,23 +43,13 @@
 import org.apache.servicemix.jbi.framework.ComponentConnector;
 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
 import org.apache.servicemix.jbi.framework.LocalComponentConnector;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.util.BoundedLinkedQueue;
+import org.apache.servicemix.jbi.util.DOMUtil;
+import org.w3c.dom.Node;
 
-import javax.jbi.component.Component;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessageExchangeFactory;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * DeliveryChannel implementation
@@ -53,9 +57,9 @@
  * @version $Revision$
  */
 public class DeliveryChannelImpl implements DeliveryChannel {
-    
+
     private static final Log log = LogFactory.getLog(DeliveryChannel.class);
-    
+
     private JBIContainer container;
     private ComponentContextImpl context;
     private LocalComponentConnector componentConnector;
@@ -248,13 +252,16 @@
     public MessageExchange accept() throws MessagingException {
         try {
             checkNotClosed();
-        	MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
+            MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
             if (log.isDebugEnabled()) {
                 log.debug("Accepting " + me.getExchangeId() + " in " + this);
             }
-        	resumeTx(me);
+            resumeTx(me);
             me.handleAccept();
-        	return me;
+            if (log.isTraceEnabled()) {
+                traceMessageExchange("Accepted", me);
+            }
+            return me;
         }
         catch (IllegalStateException e) {
             throw new MessagingException("DeliveryChannel has been closed.");
@@ -264,6 +271,45 @@
         }
     }
 
+    private void traceMessageExchange(String header, MessageExchange me) {
+        try {
+            StringBuffer sb = new StringBuffer();
+            sb.append(header);
+            sb.append(": ");
+            sb.append("MessageExchange[\n");
+            sb.append("  id: ").append(me.getExchangeId()).append('\n');
+            sb.append("  status: ").append(me.getStatus()).append('\n');
+            if (me.getMessage("in") != null) {
+                Node node = new SourceTransformer().toDOMNode(me.getMessage("in"));
+                me.getMessage("in").setContent(new DOMSource(node));
+                String str = DOMUtil.asXML(node);
+                sb.append("  in: ");
+                if (str.length() > 150) {
+                    sb.append(str, 0, 150).append("...");
+                } else {
+                    sb.append(str);
+                }
+                sb.append('\n');
+            }
+            if (me.getMessage("out") != null) {
+                Node node = new SourceTransformer().toDOMNode(me.getMessage("out"));
+                me.getMessage("out").setContent(new DOMSource(node));
+                String str = DOMUtil.asXML(node);
+                sb.append("  out: ");
+                if (str.length() > 150) {
+                    sb.append(str, 0, 150).append("...");
+                } else {
+                    sb.append(str);
+                }
+                sb.append('\n');
+            }
+            sb.append("]");
+            log.trace(sb.toString());
+        } catch (Exception e) {
+            log.trace("Unable to display message", e);
+        }
+    }
+
     /**
      * return a MessageExchange
      * 
@@ -274,8 +320,8 @@
     public MessageExchange accept(long timeoutMS) throws MessagingException {
         try {
             checkNotClosed();
-        	MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
-        	if (me != null) {
+            MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
+            if (me != null) {
                 // If the exchange has already timed out,
                 // do not give it to the component
                 if (me.getPacket().isAborted()) {
@@ -287,11 +333,14 @@
                     if (log.isDebugEnabled()) {
                         log.debug("Accepting " + me.getExchangeId() + " in " + this);
                     }
-            		resumeTx(me);
+                    resumeTx(me);
                     me.handleAccept();
+                    if (log.isTraceEnabled()) {
+                        traceMessageExchange("Accepted", me);
+                    }
                 }
-        	}
-        	return me;
+            }
+            return me;
         }
         catch (InterruptedException e) {
             throw new MessagingException("accept failed", e);
@@ -300,6 +349,9 @@
 
     protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
         try {
+            if (log.isTraceEnabled()) {
+                traceMessageExchange("Sent", messageExchange);
+            }
             // If the delivery channel has been closed
             checkNotClosed();
             // If the message has timed out
@@ -311,14 +363,14 @@
             // Update persistence info
             Boolean persistent = messageExchange.getPersistent();
             if (persistent == null) {
-            	if (context.getActivationSpec().getPersistent() != null) {
-            		persistent = context.getActivationSpec().getPersistent();
-            	} else {
-            		persistent = Boolean.valueOf(context.getContainer().isPersistent());
-            	}
-            	messageExchange.setPersistent(persistent);
+                if (context.getActivationSpec().getPersistent() != null) {
+                    persistent = context.getActivationSpec().getPersistent();
+                } else {
+                    persistent = Boolean.valueOf(context.getContainer().isPersistent());
+                }
+                messageExchange.setPersistent(persistent);
             }
-            
+
             if (exchangeThrottling) {
                 if (throttlingInterval > intervalCount) {
                     intervalCount = 0;
@@ -331,7 +383,7 @@
                 }
                 intervalCount++;
             }
-            
+
             // Update stats
             long currentTime = System.currentTimeMillis();
             if (container.isNotifyStatistics()) {
@@ -351,11 +403,11 @@
                 messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
             }
             lastSendTime = currentTime;
-            
+
             if (messageExchange.getRole() == Role.CONSUMER) {
                 messageExchange.setSourceId(componentConnector.getComponentNameSpace());
             }
-    
+
             // Call the listeners before the ownership changes
             container.callListeners(messageExchange);
             messageExchange.handleSend(sync);
@@ -378,7 +430,7 @@
                 }
             }
         }
-        
+
         /*
         if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
             synchronized (messageExchange.getMirror()) {
@@ -398,10 +450,10 @@
      */
     public void send(MessageExchange messageExchange) throws MessagingException {
         messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
-    	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
-   		doSend(messageExchangeImpl, false);
+        MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+        doSend(messageExchangeImpl, false);
     }
-    
+
     /**
      * routes a MessageExchange
      * 
@@ -410,7 +462,7 @@
      * @throws MessagingException
      */
     public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
-    	return sendSync(messageExchange, Long.MAX_VALUE);
+        return sendSync(messageExchange, Long.MAX_VALUE);
     }
 
     /**
@@ -428,7 +480,7 @@
         }
         // JBI 5.5.2.1.3: set the sendSync property
         messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
-    	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+        MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
         exchangesById.put(messageExchange.getExchangeId(), messageExchange);
         try {
             // Synchronously send a message and wait for the response
@@ -602,7 +654,7 @@
         // Here, we don't need to put the message in the queue
         MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
         if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT &&
-        	theOriginal.getRole() == me.getRole()) {
+            theOriginal.getRole() == me.getRole()) {
             suspendTx(theOriginal);
             synchronized (theOriginal) {
                 theOriginal.copyFrom(me);
@@ -671,42 +723,42 @@
     }
 
     protected void suspendTx(MessageExchangeImpl me) throws MessagingException {
-    	try {
+        try {
             Transaction oldTx = me.getTransactionContext();
             if (oldTx != null) {
                 TransactionManager tm = (TransactionManager) container.getTransactionManager();
-    			if (tm != null) {
+                if (tm != null) {
                     if (log.isDebugEnabled()) {
                         log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
                     }
-    				Transaction tx = tm.suspend();
-    				if (tx != oldTx) {
-    					throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
-    				}
-    			}
-            }
-    	} catch (Exception e) {
-    		throw new MessagingException(e);
-    	}
+                    Transaction tx = tm.suspend();
+                    if (tx != oldTx) {
+                        throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new MessagingException(e);
+        }
     }
-    
+
     protected void resumeTx(MessageExchangeImpl me) throws MessagingException {
-    	try {
-			Transaction oldTx = me.getTransactionContext();
+        try {
+            Transaction oldTx = me.getTransactionContext();
             if (oldTx != null) {
                 TransactionManager tm = (TransactionManager) container.getTransactionManager();
-    			if (tm != null) {
+                if (tm != null) {
                     if (log.isDebugEnabled()) {
                         log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
                     }
-    				tm.resume(oldTx);
-    			}
+                    tm.resume(oldTx);
+                }
             }
-    	} catch (Exception e) {
-    		throw new MessagingException(e);
-    	}
+        } catch (Exception e) {
+            throw new MessagingException(e);
+        }
     }
-    
+
     /**
      * If the jbi container configured to do so, the message exchange will
      * automatically be enlisted in the current transaction, if exists. 
@@ -715,26 +767,27 @@
      * @throws MessagingException
      */
     protected void autoEnlistInTx(MessageExchangeImpl me) throws MessagingException {
-    	try {
-        	if (container.isAutoEnlistInTransaction()) {
-        		TransactionManager tm = (TransactionManager) container.getTransactionManager();
-        		if (tm != null) {
-        			Transaction tx = tm.getTransaction();
-        			if (tx != null) {
-        				Object oldTx = me.getTransactionContext();
-        				if (oldTx == null) {
-        					me.setTransactionContext(tx);
-        				} else if (oldTx != tx) {
-        					throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
-        				}
-        			}
-        		}
-        	}
-    	} catch (Exception e) {
-    		throw new MessagingException(e);
-    	}
+        try {
+            if (container.isAutoEnlistInTransaction()) {
+                TransactionManager tm = (TransactionManager) container.getTransactionManager();
+                if (tm != null) {
+                    Transaction tx = tm.getTransaction();
+                    if (tx != null) {
+                        Object oldTx = me.getTransactionContext();
+                        if (oldTx == null) {
+                            me.setTransactionContext(tx);
+                        } else if (oldTx != tx) {
+                            throw new IllegalStateException(
+                                    "the transaction context set in the messageExchange is not bound to the current thread");
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new MessagingException(e);
+        }
     }
-    
+
     /**
      * @return pretty print
      */