You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/14 22:19:52 UTC
svn commit: r377835 -
/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
Author: gnodet
Date: Tue Feb 14 13:19:50 2006
New Revision: 377835
URL: http://svn.apache.org/viewcvs?rev=377835&view=rev
Log:
Add logging of messages sent / accepted.
Remove tabs
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=377835&r1=377834&r2=377835&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Tue Feb 14 13:19:50 2006
@@ -15,8 +15,22 @@
*/
package org.apache.servicemix.jbi.messaging;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jbi.component.Component;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+import javax.xml.transform.dom.DOMSource;
import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
@@ -29,23 +43,13 @@
import org.apache.servicemix.jbi.framework.ComponentConnector;
import org.apache.servicemix.jbi.framework.ComponentContextImpl;
import org.apache.servicemix.jbi.framework.LocalComponentConnector;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.util.BoundedLinkedQueue;
+import org.apache.servicemix.jbi.util.DOMUtil;
+import org.w3c.dom.Node;
-import javax.jbi.component.Component;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessageExchangeFactory;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* DeliveryChannel implementation
@@ -53,9 +57,9 @@
* @version $Revision$
*/
public class DeliveryChannelImpl implements DeliveryChannel {
-
+
private static final Log log = LogFactory.getLog(DeliveryChannel.class);
-
+
private JBIContainer container;
private ComponentContextImpl context;
private LocalComponentConnector componentConnector;
@@ -248,13 +252,16 @@
public MessageExchange accept() throws MessagingException {
try {
checkNotClosed();
- MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
+ MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
if (log.isDebugEnabled()) {
log.debug("Accepting " + me.getExchangeId() + " in " + this);
}
- resumeTx(me);
+ resumeTx(me);
me.handleAccept();
- return me;
+ if (log.isTraceEnabled()) {
+ traceMessageExchange("Accepted", me);
+ }
+ return me;
}
catch (IllegalStateException e) {
throw new MessagingException("DeliveryChannel has been closed.");
@@ -264,6 +271,45 @@
}
}
+ private void traceMessageExchange(String header, MessageExchange me) {
+ try {
+ StringBuffer sb = new StringBuffer();
+ sb.append(header);
+ sb.append(": ");
+ sb.append("MessageExchange[\n");
+ sb.append(" id: ").append(me.getExchangeId()).append('\n');
+ sb.append(" status: ").append(me.getStatus()).append('\n');
+ if (me.getMessage("in") != null) {
+ Node node = new SourceTransformer().toDOMNode(me.getMessage("in"));
+ me.getMessage("in").setContent(new DOMSource(node));
+ String str = DOMUtil.asXML(node);
+ sb.append(" in: ");
+ if (str.length() > 150) {
+ sb.append(str, 0, 150).append("...");
+ } else {
+ sb.append(str);
+ }
+ sb.append('\n');
+ }
+ if (me.getMessage("out") != null) {
+ Node node = new SourceTransformer().toDOMNode(me.getMessage("out"));
+ me.getMessage("out").setContent(new DOMSource(node));
+ String str = DOMUtil.asXML(node);
+ sb.append(" out: ");
+ if (str.length() > 150) {
+ sb.append(str, 0, 150).append("...");
+ } else {
+ sb.append(str);
+ }
+ sb.append('\n');
+ }
+ sb.append("]");
+ log.trace(sb.toString());
+ } catch (Exception e) {
+ log.trace("Unable to display message", e);
+ }
+ }
+
/**
* return a MessageExchange
*
@@ -274,8 +320,8 @@
public MessageExchange accept(long timeoutMS) throws MessagingException {
try {
checkNotClosed();
- MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
- if (me != null) {
+ MessageExchangeImpl me = (MessageExchangeImpl) queue.poll(timeoutMS);
+ if (me != null) {
// If the exchange has already timed out,
// do not give it to the component
if (me.getPacket().isAborted()) {
@@ -287,11 +333,14 @@
if (log.isDebugEnabled()) {
log.debug("Accepting " + me.getExchangeId() + " in " + this);
}
- resumeTx(me);
+ resumeTx(me);
me.handleAccept();
+ if (log.isTraceEnabled()) {
+ traceMessageExchange("Accepted", me);
+ }
}
- }
- return me;
+ }
+ return me;
}
catch (InterruptedException e) {
throw new MessagingException("accept failed", e);
@@ -300,6 +349,9 @@
protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
try {
+ if (log.isTraceEnabled()) {
+ traceMessageExchange("Sent", messageExchange);
+ }
// If the delivery channel has been closed
checkNotClosed();
// If the message has timed out
@@ -311,14 +363,14 @@
// Update persistence info
Boolean persistent = messageExchange.getPersistent();
if (persistent == null) {
- if (context.getActivationSpec().getPersistent() != null) {
- persistent = context.getActivationSpec().getPersistent();
- } else {
- persistent = Boolean.valueOf(context.getContainer().isPersistent());
- }
- messageExchange.setPersistent(persistent);
+ if (context.getActivationSpec().getPersistent() != null) {
+ persistent = context.getActivationSpec().getPersistent();
+ } else {
+ persistent = Boolean.valueOf(context.getContainer().isPersistent());
+ }
+ messageExchange.setPersistent(persistent);
}
-
+
if (exchangeThrottling) {
if (throttlingInterval > intervalCount) {
intervalCount = 0;
@@ -331,7 +383,7 @@
}
intervalCount++;
}
-
+
// Update stats
long currentTime = System.currentTimeMillis();
if (container.isNotifyStatistics()) {
@@ -351,11 +403,11 @@
messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
}
lastSendTime = currentTime;
-
+
if (messageExchange.getRole() == Role.CONSUMER) {
messageExchange.setSourceId(componentConnector.getComponentNameSpace());
}
-
+
// Call the listeners before the ownership changes
container.callListeners(messageExchange);
messageExchange.handleSend(sync);
@@ -378,7 +430,7 @@
}
}
}
-
+
/*
if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
synchronized (messageExchange.getMirror()) {
@@ -398,10 +450,10 @@
*/
public void send(MessageExchange messageExchange) throws MessagingException {
messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
- MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
- doSend(messageExchangeImpl, false);
+ MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+ doSend(messageExchangeImpl, false);
}
-
+
/**
* routes a MessageExchange
*
@@ -410,7 +462,7 @@
* @throws MessagingException
*/
public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
- return sendSync(messageExchange, Long.MAX_VALUE);
+ return sendSync(messageExchange, Long.MAX_VALUE);
}
/**
@@ -428,7 +480,7 @@
}
// JBI 5.5.2.1.3: set the sendSync property
messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
- MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
+ MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
exchangesById.put(messageExchange.getExchangeId(), messageExchange);
try {
// Synchronously send a message and wait for the response
@@ -602,7 +654,7 @@
// Here, we don't need to put the message in the queue
MessageExchangeImpl theOriginal = (MessageExchangeImpl) exchangesById.get(me.getExchangeId());
if (theOriginal != null && theOriginal.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT &&
- theOriginal.getRole() == me.getRole()) {
+ theOriginal.getRole() == me.getRole()) {
suspendTx(theOriginal);
synchronized (theOriginal) {
theOriginal.copyFrom(me);
@@ -671,42 +723,42 @@
}
protected void suspendTx(MessageExchangeImpl me) throws MessagingException {
- try {
+ try {
Transaction oldTx = me.getTransactionContext();
if (oldTx != null) {
TransactionManager tm = (TransactionManager) container.getTransactionManager();
- if (tm != null) {
+ if (tm != null) {
if (log.isDebugEnabled()) {
log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
}
- Transaction tx = tm.suspend();
- if (tx != oldTx) {
- throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
- }
- }
- }
- } catch (Exception e) {
- throw new MessagingException(e);
- }
+ Transaction tx = tm.suspend();
+ if (tx != oldTx) {
+ throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
}
-
+
protected void resumeTx(MessageExchangeImpl me) throws MessagingException {
- try {
- Transaction oldTx = me.getTransactionContext();
+ try {
+ Transaction oldTx = me.getTransactionContext();
if (oldTx != null) {
TransactionManager tm = (TransactionManager) container.getTransactionManager();
- if (tm != null) {
+ if (tm != null) {
if (log.isDebugEnabled()) {
log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
}
- tm.resume(oldTx);
- }
+ tm.resume(oldTx);
+ }
}
- } catch (Exception e) {
- throw new MessagingException(e);
- }
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
}
-
+
/**
* If the jbi container configured to do so, the message exchange will
* automatically be enlisted in the current transaction, if exists.
@@ -715,26 +767,27 @@
* @throws MessagingException
*/
protected void autoEnlistInTx(MessageExchangeImpl me) throws MessagingException {
- try {
- if (container.isAutoEnlistInTransaction()) {
- TransactionManager tm = (TransactionManager) container.getTransactionManager();
- if (tm != null) {
- Transaction tx = tm.getTransaction();
- if (tx != null) {
- Object oldTx = me.getTransactionContext();
- if (oldTx == null) {
- me.setTransactionContext(tx);
- } else if (oldTx != tx) {
- throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
- }
- }
- }
- }
- } catch (Exception e) {
- throw new MessagingException(e);
- }
+ try {
+ if (container.isAutoEnlistInTransaction()) {
+ TransactionManager tm = (TransactionManager) container.getTransactionManager();
+ if (tm != null) {
+ Transaction tx = tm.getTransaction();
+ if (tx != null) {
+ Object oldTx = me.getTransactionContext();
+ if (oldTx == null) {
+ me.setTransactionContext(tx);
+ } else if (oldTx != tx) {
+ throw new IllegalStateException(
+ "the transaction context set in the messageExchange is not bound to the current thread");
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
}
-
+
/**
* @return pretty print
*/