You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/10 16:38:14 UTC
svn commit: r564607 [5/12] - in
/incubator/servicemix/trunk/core/servicemix-core/src:
main/java/org/apache/servicemix/ main/java/org/apache/servicemix/jbi/
main/java/org/apache/servicemix/jbi/framework/
main/java/org/apache/servicemix/jbi/framework/sup...
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Fri Aug 10 07:37:46 2007
@@ -19,6 +19,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jbi.JBIException;
import javax.jbi.component.Component;
@@ -26,9 +31,9 @@
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -47,12 +52,6 @@
import org.apache.servicemix.jbi.framework.ComponentContextImpl;
import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* DeliveryChannel implementation
*
@@ -60,23 +59,32 @@
*/
public class DeliveryChannelImpl implements DeliveryChannel {
- private static final Log log = LogFactory.getLog(DeliveryChannelImpl.class);
+ private static final Log LOG = LogFactory.getLog(DeliveryChannelImpl.class);
private JBIContainer container;
+
private ComponentContextImpl context;
+
private ComponentMBeanImpl component;
+
private BlockingQueue<MessageExchangeImpl> queue;
+
private IdGenerator idGenerator = new IdGenerator();
+
private MessageExchangeFactory inboundFactory;
- private int intervalCount = 0;
+
+ private int intervalCount;
+
private AtomicBoolean closed = new AtomicBoolean(false);
+
private Map<Thread, Boolean> waiters = new ConcurrentHashMap<Thread, Boolean>();
- private TransactionManager transactionManager = null;
-
+
+ private TransactionManager transactionManager;
+
/**
- * When using clustering and sendSync, the exchange received will not be the same
- * as the one sent (because it has been serialized/deserialized.
- * We thus need to keep the original exchange in a map and override its state.
+ * When using clustering and sendSync, the exchange received will not be the
+ * same as the one sent (because it has been serialized/deserialized. We
+ * thus need to keep the original exchange in a map and override its state.
*/
private Map<String, MessageExchangeImpl> exchangesById = new ConcurrentHashMap<String, MessageExchangeImpl>();
@@ -104,13 +112,14 @@
*/
public void close() throws MessagingException {
if (this.closed.compareAndSet(false, true)) {
- if (log.isDebugEnabled()) {
- log.debug("Closing DeliveryChannel " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing DeliveryChannel " + this);
}
List<MessageExchangeImpl> pending = new ArrayList<MessageExchangeImpl>(queue.size());
queue.drainTo(pending);
for (MessageExchangeImpl messageExchange : pending) {
- if (messageExchange.getTransactionContext() != null && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+ if (messageExchange.getTransactionContext() != null
+ && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
notifyExchange(messageExchange.getMirror(), messageExchange.getMirror(), "close");
}
}
@@ -125,7 +134,7 @@
try {
component.getContext().deactivateEndpoint(endpoints[i]);
} catch (JBIException e) {
- log.error("Error deactivating endpoint", e);
+ LOG.error("Error deactivating endpoint", e);
}
}
// TODO: Cause all accepts to return null
@@ -140,8 +149,8 @@
}
/**
- * Create a message exchange factory. This factory will create exchange instances with all appropriate properties
- * set to null.
+ * Create a message exchange factory. This factory will create exchange
+ * instances with all appropriate properties set to null.
*
* @return a message exchange factory
*/
@@ -155,34 +164,34 @@
QName serviceName = activationSpec.getDestinationService();
if (serviceName != null) {
result.setServiceName(serviceName);
- log.debug("default destination serviceName for " + componentName + " = " + serviceName);
+ LOG.debug("default destination serviceName for " + componentName + " = " + serviceName);
}
QName interfaceName = activationSpec.getDestinationInterface();
if (interfaceName != null) {
result.setInterfaceName(interfaceName);
- log.debug("default destination interfaceName for " + componentName + " = " + interfaceName);
+ LOG.debug("default destination interfaceName for " + componentName + " = " + interfaceName);
}
QName operationName = activationSpec.getDestinationOperation();
if (operationName != null) {
result.setOperationName(operationName);
- log.debug("default destination operationName for " + componentName + " = " + operationName);
+ LOG.debug("default destination operationName for " + componentName + " = " + operationName);
}
String endpointName = activationSpec.getDestinationEndpoint();
if (endpointName != null) {
boolean endpointSet = false;
- log.debug("default destination endpointName for " + componentName + " = " + endpointName);
+ LOG.debug("default destination endpointName for " + componentName + " = " + endpointName);
if (serviceName != null && endpointName != null) {
endpointName = endpointName.trim();
ServiceEndpoint endpoint = container.getRegistry().getEndpoint(serviceName, endpointName);
if (endpoint != null) {
result.setEndpoint(endpoint);
- log.info("Set default destination endpoint for " + componentName + " to " + endpoint);
+ LOG.info("Set default destination endpoint for " + componentName + " to " + endpoint);
endpointSet = true;
}
}
if (!endpointSet) {
- log.warn("Could not find destination endpoint for " + componentName + " service(" + serviceName
- + ") with endpointName " + endpointName);
+ LOG.warn("Could not find destination endpoint for " + componentName + " service(" + serviceName
+ + ") with endpointName " + endpointName);
}
}
}
@@ -192,8 +201,11 @@
/**
* Create a message exchange factory for the given interface name.
*
- * @param interfaceName name of the interface for which all exchanges created by the returned factory will be set
- * @return an exchange factory that will create exchanges for the given interface; must be non-null
+ * @param interfaceName
+ * name of the interface for which all exchanges created by the
+ * returned factory will be set
+ * @return an exchange factory that will create exchanges for the given
+ * interface; must be non-null
*/
public MessageExchangeFactory createExchangeFactory(QName interfaceName) {
MessageExchangeFactoryImpl result = createMessageExchangeFactory();
@@ -204,8 +216,11 @@
/**
* Create a message exchange factory for the given service name.
*
- * @param serviceName name of the service for which all exchanges created by the returned factory will be set
- * @return an exchange factory that will create exchanges for the given service; must be non-null
+ * @param serviceName
+ * name of the service for which all exchanges created by the
+ * returned factory will be set
+ * @return an exchange factory that will create exchanges for the given
+ * service; must be non-null
*/
public MessageExchangeFactory createExchangeFactoryForService(QName serviceName) {
MessageExchangeFactoryImpl result = createMessageExchangeFactory();
@@ -216,8 +231,11 @@
/**
* Create a message exchange factory for the given endpoint.
*
- * @param endpoint endpoint for which all exchanges created by the returned factory will be set for
- * @return an exchange factory that will create exchanges for the given endpoint
+ * @param endpoint
+ * endpoint for which all exchanges created by the returned
+ * factory will be set for
+ * @return an exchange factory that will create exchanges for the given
+ * endpoint
*/
public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) {
MessageExchangeFactoryImpl result = createMessageExchangeFactory();
@@ -254,36 +272,35 @@
// If the exchange has already timed out,
// do not give it to the component
if (me.getPacket().isAborted()) {
- if (log.isDebugEnabled()) {
- log.debug("Aborted " + me.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Aborted " + me.getExchangeId() + " in " + this);
}
me = null;
} else {
- if (log.isDebugEnabled()) {
- log.debug("Accepting " + me.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Accepting " + me.getExchangeId() + " in " + this);
}
- // If we have a tx lock and the exchange is not active, we need
+ // If we have a tx lock and the exchange is not active, we
+ // need
// to notify here without resuming transaction
if (me.getTxLock() != null && me.getStatus() != ExchangeStatus.ACTIVE) {
notifyExchange(me.getMirror(), me.getTxLock(), "acceptFinishedExchangeWithTxLock");
me.handleAccept();
- if (log.isTraceEnabled()) {
- log.trace("Accepted: " + me);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Accepted: " + me);
}
- }
// We transactionnaly deliver a finished exchange
- else if (me.isTransacted() && me.getStatus() != ExchangeStatus.ACTIVE) {
+ } else if (me.isTransacted() && me.getStatus() != ExchangeStatus.ACTIVE) {
// Do not resume transaction
me.handleAccept();
- if (log.isTraceEnabled()) {
- log.trace("Accepted: " + me);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Accepted: " + me);
}
- }
- else {
+ } else {
resumeTx(me);
me.handleAccept();
- if (log.isTraceEnabled()) {
- log.trace("Accepted: " + me);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Accepted: " + me);
}
}
}
@@ -296,17 +313,16 @@
try {
l[i].exchangeAccepted(event);
} catch (Exception e) {
- log.warn("Error calling listener: " + e.getMessage(), e);
+ LOG.warn("Error calling listener: " + e.getMessage(), e);
}
}
}
return me;
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new MessagingException("accept failed", e);
}
}
-
+
protected void autoSetPersistent(MessageExchangeImpl me) {
Boolean persistent = me.getPersistent();
if (persistent == null) {
@@ -318,16 +334,15 @@
me.setPersistent(persistent);
}
}
-
+
protected void throttle() {
if (component.isExchangeThrottling()) {
if (component.getThrottlingInterval() > intervalCount) {
intervalCount = 0;
try {
Thread.sleep(component.getThrottlingTimeout());
- }
- catch (InterruptedException e) {
- log.warn("throttling failed", e);
+ } catch (InterruptedException e) {
+ LOG.warn("throttling failed", e);
}
}
intervalCount++;
@@ -338,8 +353,8 @@
MessageExchangeImpl mirror = me.getMirror();
boolean finished = me.getStatus() != ExchangeStatus.ACTIVE;
try {
- if (log.isTraceEnabled()) {
- log.trace("Sent: " + me);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sent: " + me);
}
// If the message has timed out
if (me.getPacket().isAborted()) {
@@ -363,25 +378,24 @@
try {
l[i].exchangeSent(event);
} catch (Exception e) {
- log.warn("Error calling listener: " + e.getMessage(), e);
+ LOG.warn("Error calling listener: " + e.getMessage(), e);
}
}
// Change ownership
me.handleSend(sync);
mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE);
- // If this is the DONE or ERROR status from a synchronous transactional exchange,
- // it should not be part of the transaction, so remove the tx context
- if (finished &&
- me.getTxLock() == null &&
- me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED &&
- me.isPushDelivery() == false &&
- me.getRole() == Role.CONSUMER) {
+ // If this is the DONE or ERROR status from a synchronous
+ // transactional exchange,
+ // it should not be part of the transaction, so remove the tx
+ // context
+ if (finished && me.getTxLock() == null && me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED
+ && !me.isPushDelivery() && me.getRole() == Role.CONSUMER) {
me.setTransactionContext(null);
}
container.sendExchange(mirror);
} catch (MessagingException e) {
- if (log.isDebugEnabled()) {
- log.debug("Exception processing: " + me.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception processing: " + me.getExchangeId() + " in " + this);
}
throw e;
} finally {
@@ -407,8 +421,8 @@
// If the delivery channel has been closed
checkNotClosed();
// Log call
- if (log.isDebugEnabled()) {
- log.debug("Send " + messageExchange.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send " + messageExchange.getExchangeId() + " in " + this);
}
// // JBI 5.5.2.1.3: remove sync property
messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
@@ -440,8 +454,8 @@
// If the delivery channel has been closed
checkNotClosed();
// Log call
- if (log.isDebugEnabled()) {
- log.debug("SendSync " + messageExchange.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SendSync " + messageExchange.getExchangeId() + " in " + this);
}
boolean result = false;
// JBI 5.5.2.1.3: set the sendSync property
@@ -457,8 +471,8 @@
if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
waitForExchange(me, me, timeout, "sendSync");
} else {
- if (log.isDebugEnabled()) {
- log.debug("Exchange " + messageExchange.getExchangeId() + " has already been answered (no need to wait)");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exchange " + messageExchange.getExchangeId() + " has already been answered (no need to wait)");
}
}
}
@@ -468,14 +482,14 @@
// the message has been delivered in the same thread
// so there is no need to resume the transaction
// See processInBound
- //if (messageExchangeImpl.getSyncSenderThread() != null) {
- resumeTx(me);
- //}
+ // if (messageExchangeImpl.getSyncSenderThread() != null) {
+ resumeTx(me);
+ // }
result = true;
} else {
// JBI 5.5.2.1.3: the exchange should be set to ERROR status
- if (log.isDebugEnabled()) {
- log.debug("Exchange " + messageExchange.getExchangeId() + " has been aborted");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exchange " + messageExchange.getExchangeId() + " has been aborted");
}
me.getPacket().setAborted(true);
result = false;
@@ -483,7 +497,7 @@
} catch (InterruptedException e) {
throw new MessagingException(e);
} catch (RuntimeException e) {
-// e.printStackTrace();
+ // e.printStackTrace();
throw e;
} finally {
exchangesById.remove(exchangeKey);
@@ -499,7 +513,8 @@
}
/**
- * @param container The container to set.
+ * @param container
+ * The container to set.
*/
public void setContainer(JBIContainer container) {
this.container = container;
@@ -537,8 +552,8 @@
* @throws MessagingException
*/
public void processInBound(MessageExchangeImpl me) throws MessagingException {
- if (log.isTraceEnabled()) {
- log.trace("Processing inbound exchange: " + me);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing inbound exchange: " + me);
}
// Check if the delivery channel has been closed
checkNotClosed();
@@ -548,19 +563,21 @@
original.copyFrom(me);
me = original;
}
- // Check if the incoming exchange is a response to a synchronous exchange previously sent
- // In this case, we do not have to queue it, but rather notify the waiting thread.
+ // Check if the incoming exchange is a response to a synchronous
+ // exchange previously sent
+ // In this case, we do not have to queue it, but rather notify the
+ // waiting thread.
if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
// If the mirror has been delivered using push, better wait until
- // the push call return. This can only work if not using clustered flows,
+ // the push call return. This can only work if not using clustered
+ // flows,
// but the flag is transient so we do not care.
- /*if (!me.getMirror().isPushDelivery())*/ {
- // Ensure that data is uptodate with the incoming exchange (in case the exchange has
- // been serialized / deserialized by a clustered flow)
- suspendTx(original);
- me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
- notifyExchange(original, original, "processInboundSynchronousExchange");
- }
+ // Ensure that data is uptodate with the incoming exchange (in
+ // case the exchange has
+ // been serialized / deserialized by a clustered flow)
+ suspendTx(original);
+ me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
+ notifyExchange(original, original, "processInboundSynchronousExchange");
return;
}
@@ -570,8 +587,8 @@
MessageExchangeListener listener = getExchangeListener();
if (listener != null) {
me.handleAccept();
- if (log.isTraceEnabled()) {
- log.trace("Received: " + me);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received: " + me);
}
// Call input listeners
ExchangeListener[] l = (ExchangeListener[]) container.getListeners(ExchangeListener.class);
@@ -580,7 +597,7 @@
try {
l[i].exchangeAccepted(event);
} catch (Exception e) {
- log.warn("Error calling listener: " + e.getMessage(), e);
+ LOG.warn("Error calling listener: " + e.getMessage(), e);
}
}
// Set the flag the the exchange was delivered using push mode
@@ -594,32 +611,33 @@
} finally {
Thread.currentThread().setContextClassLoader(old);
}
- // TODO: handle delayed exchange notifications
+ // TODO: handle delayed exchange notifications
return;
}
-
+
// Component uses pull delivery.
-
+
// If the exchange is transacted, special care should be taken.
// But if the exchange is no more ACTIVE, just queue it, as
// we will never have an answer back.
if (me.isTransacted() && me.getStatus() == ExchangeStatus.ACTIVE) {
// If the transaction is conveyed by the exchange
- // We do not need to resume the transaction in this thread
+ // We do not need to resume the transaction in this thread
if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
try {
suspendTx(me);
queue.put(me);
} catch (InterruptedException e) {
- log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
+ LOG.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
me.getPacket().setAborted(true);
}
- }
// Else the delivery / send are enlisted in the current tx.
- // We must suspend the transaction, queue it, and wait for the answer
- // to be sent, at which time the tx should be suspended and resumed in
+ // We must suspend the transaction, queue it, and wait for the
+ // answer
+ // to be sent, at which time the tx should be suspended and resumed
+ // in
// this thread.
- else {
+ } else {
Object lock = new Object();
synchronized (lock) {
try {
@@ -628,7 +646,7 @@
queue.put(me);
waitForExchange(me, lock, 0, "processInboundTransactionalExchange");
} catch (InterruptedException e) {
- log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
+ LOG.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
me.getPacket().setAborted(true);
} finally {
me.setTxLock(null);
@@ -636,24 +654,24 @@
}
}
}
- }
- // If the exchange is ACTIVE, the transaction boundary will suspended when the
+ // If the exchange is ACTIVE, the transaction boundary will suspended
+ // when the
// answer is sent
// Else just queue the exchange
- else {
+ } else {
try {
queue.put(me);
} catch (InterruptedException e) {
- log.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
+ LOG.debug("Exchange " + me.getExchangeId() + " aborted due to thread interruption", e);
me.getPacket().setAborted(true);
}
}
}
-
+
protected MessageExchangeListener getExchangeListener() {
- Component component = this.component.getComponent();
- if (component instanceof MessageExchangeListener) {
- return (MessageExchangeListener) component;
+ Component comp = this.component.getComponent();
+ if (comp instanceof MessageExchangeListener) {
+ return (MessageExchangeListener) comp;
}
ComponentLifeCycle lifecycle = this.component.getLifeCycle();
if (lifecycle instanceof MessageExchangeListener) {
@@ -661,17 +679,19 @@
}
return null;
}
-
+
/**
- * Synchronization must be performed on the given exchange when calling this method
+ * Synchronization must be performed on the given exchange when calling this
+ * method
*
* @param me
* @throws InterruptedException
*/
protected void waitForExchange(MessageExchangeImpl me, Object lock, long timeout, String from) throws InterruptedException {
// If the channel is closed while here, we must abort
- if (log.isDebugEnabled()) {
- log.debug("Waiting for exchange " + me.getExchangeId() + " (" + Integer.toHexString(me.hashCode()) + ") to be answered in " + this + " from " + from);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for exchange " + me.getExchangeId() + " (" + Integer.toHexString(me.hashCode()) + ") to be answered in "
+ + this + " from " + from);
}
Thread th = Thread.currentThread();
try {
@@ -680,14 +700,15 @@
} finally {
waiters.remove(th);
}
- if (log.isDebugEnabled()) {
- log.debug("Notified: " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notified: " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
}
}
-
+
protected void notifyExchange(MessageExchangeImpl me, Object lock, String from) {
- if (log.isDebugEnabled()) {
- log.debug("Notifying exchange " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from " + from);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notifying exchange " + me.getExchangeId() + "(" + Integer.toHexString(me.hashCode()) + ") in " + this + " from "
+ + from);
}
synchronized (lock) {
lock.notify();
@@ -711,16 +732,17 @@
try {
Transaction oldTx = me.getTransactionContext();
if (oldTx != null) {
- if (log.isDebugEnabled()) {
- log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
}
Transaction tx = transactionManager.suspend();
if (tx != oldTx) {
- throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
+ throw new IllegalStateException(
+ "the transaction context set in the messageExchange is not bound to the current thread");
}
}
} catch (Exception e) {
- log.info("Exchange " + me.getExchangeId() + " aborted due to transaction exception", e);
+ LOG.info("Exchange " + me.getExchangeId() + " aborted due to transaction exception", e);
me.getPacket().setAborted(true);
}
}
@@ -731,8 +753,8 @@
try {
Transaction oldTx = me.getTransactionContext();
if (oldTx != null) {
- if (log.isDebugEnabled()) {
- log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
}
transactionManager.resume(oldTx);
}
@@ -744,7 +766,7 @@
/**
* If the jbi container configured to do so, the message exchange will
- * automatically be enlisted in the current transaction, if exists.
+ * automatically be enlisted in the current transaction, if exists.
*
* @throws MessagingException
*/
@@ -758,7 +780,7 @@
me.setTransactionContext(tx);
} else if (oldTx != tx) {
throw new IllegalStateException(
- "the transaction context set in the messageExchange is not bound to the current thread");
+ "the transaction context set in the messageExchange is not bound to the current thread");
}
}
} catch (Exception e) {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/ExchangePacket.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/ExchangePacket.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/ExchangePacket.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/ExchangePacket.java Fri Aug 10 07:37:46 2007
@@ -16,17 +16,6 @@
*/
package org.apache.servicemix.jbi.messaging;
-import org.apache.servicemix.components.util.CopyTransformer;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.transaction.Transaction;
-import javax.xml.namespace.QName;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
@@ -41,34 +30,60 @@
import java.util.Map;
import java.util.Set;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.transaction.Transaction;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.components.util.CopyTransformer;
+import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+
/**
* ExchangePacket is responsible for carrying MessageExchange payloads
*
* @version $Revision$
*/
public class ExchangePacket implements Externalizable {
-
+
private static final long serialVersionUID = -9110837382914609624L;
-
+
protected URI pattern;
+
protected String exchangeId;
+
protected ComponentNameSpace destinationId;
+
protected ComponentNameSpace sourceId;
+
protected ExchangeStatus status = ExchangeStatus.ACTIVE;
+
protected QName serviceName;
+
protected QName interfaceName;
+
protected QName operationName;
+
protected Exception error;
+
protected Map properties;
+
protected NormalizedMessageImpl in;
+
protected NormalizedMessageImpl out;
+
protected FaultImpl fault;
+
protected ServiceEndpoint endpoint;
+
protected transient Transaction transactionContext;
+
protected Boolean persistent;
+
protected boolean aborted;
-
public ExchangePacket() {
}
@@ -76,7 +91,7 @@
this.destinationId = packet.destinationId;
this.endpoint = null; // packet.endpoint;
this.error = null;
- this.exchangeId = null; //???;
+ this.exchangeId = null; // ???;
this.interfaceName = packet.interfaceName;
CopyTransformer ct = new CopyTransformer();
if (packet.in != null) {
@@ -111,7 +126,8 @@
}
/**
- * @param endpoint The endpoint to set.
+ * @param endpoint
+ * The endpoint to set.
*/
public void setEndpoint(ServiceEndpoint endpoint) {
this.endpoint = endpoint;
@@ -125,7 +141,8 @@
}
/**
- * @param transactionContext The transactionContext to set.
+ * @param transactionContext
+ * The transactionContext to set.
*/
public void setTransactionContext(Transaction transactionContext) {
this.transactionContext = transactionContext;
@@ -139,7 +156,8 @@
}
/**
- * @param interfaceName The interfaceName to set.
+ * @param interfaceName
+ * The interfaceName to set.
*/
public void setInterfaceName(QName interfaceName) {
this.interfaceName = interfaceName;
@@ -153,7 +171,8 @@
}
/**
- * @param operationName The operationName to set.
+ * @param operationName
+ * The operationName to set.
*/
public void setOperationName(QName operationName) {
this.operationName = operationName;
@@ -167,14 +186,16 @@
}
/**
- * @param serviceName The serviceName to set.
+ * @param serviceName
+ * The serviceName to set.
*/
public void setServiceName(QName serviceName) {
this.serviceName = serviceName;
}
/**
- * @param status The status to set.
+ * @param status
+ * The status to set.
*/
public void setStatus(ExchangeStatus status) {
this.status = status;
@@ -195,7 +216,8 @@
}
/**
- * @param pattern The pattern to set.
+ * @param pattern
+ * The pattern to set.
*/
public void setPattern(URI pattern) {
this.pattern = pattern;
@@ -209,7 +231,8 @@
}
/**
- * @param error The error to set.
+ * @param error
+ * The error to set.
*/
public void setError(Exception error) {
this.error = error;
@@ -224,7 +247,8 @@
}
/**
- * @param exchangeId The exchangeId to set.
+ * @param exchangeId
+ * The exchangeId to set.
*/
public void setExchangeId(String exchangeId) {
this.exchangeId = exchangeId;
@@ -287,7 +311,8 @@
}
/**
- * @param sourceId The sourceId to set.
+ * @param sourceId
+ * The sourceId to set.
*/
public void setSourceId(ComponentNameSpace sourceId) {
this.sourceId = sourceId;
@@ -301,7 +326,8 @@
}
/**
- * @param destinationId The destinationId to set.
+ * @param destinationId
+ * The destinationId to set.
*/
public void setDestinationId(ComponentNameSpace destinationId) {
this.destinationId = destinationId;
@@ -315,7 +341,8 @@
}
/**
- * @param fault The fault to set.
+ * @param fault
+ * The fault to set.
*/
public void setFault(FaultImpl fault) {
this.fault = fault;
@@ -329,7 +356,8 @@
}
/**
- * @param in The in to set.
+ * @param in
+ * The in to set.
*/
public void setIn(NormalizedMessageImpl in) {
this.in = in;
@@ -343,7 +371,8 @@
}
/**
- * @param out The out to set.
+ * @param out
+ * The out to set.
*/
public void setOut(NormalizedMessageImpl out) {
this.out = out;
@@ -358,6 +387,7 @@
/**
* Write to a Stream
+ *
* @param output
* @throws IOException
*/
@@ -407,19 +437,20 @@
/**
* Creates a copy of the packet so it can be sent to another destination
- * @throws MessagingException
+ *
+ * @throws MessagingException
*/
public ExchangePacket copy() throws MessagingException {
return new ExchangePacket(this);
}
- public Boolean getPersistent() {
- return persistent;
- }
-
- public void setPersistent(Boolean persistent) {
- this.persistent = persistent;
- }
+ public Boolean getPersistent() {
+ return persistent;
+ }
+
+ public void setPersistent(Boolean persistent) {
+ this.persistent = persistent;
+ }
public boolean isAborted() {
return aborted;
@@ -428,23 +459,26 @@
public void setAborted(boolean timedOut) {
this.aborted = timedOut;
}
-
+
/**
* Retrieve the serialized from of this packet
+ *
* @return the serialized packet
- * @throws IOException
+ * @throws IOException
*/
public byte[] getData() throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buffer);
- out.writeObject(this);
- out.close();
+ ObjectOutputStream os = new ObjectOutputStream(buffer);
+ os.writeObject(this);
+ os.close();
return buffer.toByteArray();
}
-
+
/**
* Deserialize an ExchangePacket.
- * @param data the serialized packet
+ *
+ * @param data
+ * the serialized packet
* @return the deserialized packet
* @throws IOException
* @throws ClassNotFoundException
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/FaultImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/FaultImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/FaultImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/FaultImpl.java Fri Aug 10 07:37:46 2007
@@ -20,7 +20,7 @@
/**
* Fault implementation
- *
+ *
* @version $Revision$
*/
public class FaultImpl extends NormalizedMessageImpl implements Fault {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOnlyImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOnlyImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOnlyImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOnlyImpl.java Fri Aug 10 07:37:46 2007
@@ -16,50 +16,51 @@
*/
package org.apache.servicemix.jbi.messaging;
-import javax.jbi.messaging.InOnly;
-
import java.io.IOException;
import java.io.ObjectInput;
+import javax.jbi.messaging.InOnly;
+
/**
* InOnly message exchange.
- *
+ *
* @version $Revision$
*/
public class InOnlyImpl extends MessageExchangeImpl implements InOnly {
-
+
private static final long serialVersionUID = -4851111881482457905L;
-
- private static int[][] STATES_CONSUMER = {
- { CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1 },
- { CAN_CONSUMER, -1, -1, 2, 2 },
- { CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
+
+ private static final int[][] STATES_CONSUMER = {
+ {CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1 },
+ {CAN_CONSUMER, -1, -1, 2, 2 },
+ {CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 }
};
- private static int[][] STATES_PROVIDER = {
- { CAN_PROVIDER, 1, -1, -1, -1 },
- { CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_STATUS_DONE + CAN_STATUS_ERROR, -1, -1, 2, 2 },
- { CAN_PROVIDER, -1, -1, -1, -1 },
+
+ private static final int[][] STATES_PROVIDER = {
+ {CAN_PROVIDER, 1, -1, -1, -1 },
+ {CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_STATUS_DONE + CAN_STATUS_ERROR, -1, -1, 2, 2 },
+ {CAN_PROVIDER, -1, -1, -1, -1 }
};
-
+
public InOnlyImpl() {
}
-
+
public InOnlyImpl(String exchangeId) {
super(exchangeId, MessageExchangeSupport.IN_ONLY, STATES_CONSUMER);
this.mirror = new InOnlyImpl(this);
}
-
+
public InOnlyImpl(ExchangePacket packet) {
super(packet, STATES_CONSUMER);
this.mirror = new InOnlyImpl(this);
}
-
+
protected InOnlyImpl(InOnlyImpl mep) {
super(mep.packet, STATES_PROVIDER);
this.mirror = mep;
}
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.packet = new ExchangePacket();
this.packet.readExternal(in);
if (this.packet.in != null) {
@@ -85,6 +86,5 @@
this.mirror.states = STATES_PROVIDER;
}
}
-
}
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOptionalOutImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOptionalOutImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOptionalOutImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOptionalOutImpl.java Fri Aug 10 07:37:46 2007
@@ -16,61 +16,63 @@
*/
package org.apache.servicemix.jbi.messaging;
-import javax.jbi.messaging.InOptionalOut;
-
import java.io.IOException;
import java.io.ObjectInput;
+import javax.jbi.messaging.InOptionalOut;
+
/**
* InOptionalOut message exchange.
- *
+ *
* @version $Revision$
*/
public class InOptionalOutImpl extends MessageExchangeImpl implements InOptionalOut {
-
+
private static final long serialVersionUID = -3145649037372074912L;
- private static int[][] STATES_CONSUMER = {
- { CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1},
- { CAN_CONSUMER, 2, 3, 4, 4 },
- { CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_SET_FAULT_MSG + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, 5, 6, 6},
- { CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, -1, 6, 6},
- { CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
- { CAN_CONSUMER, -1, -1, 7, 7 },
- { CAN_CONSUMER, -1, -1, -1, -1 },
- { CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
+ private static final int[][] STATES_CONSUMER = {
+ {CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1 },
+ {CAN_CONSUMER, 2, 3, 4, 4 },
+ {CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_SET_FAULT_MSG + CAN_STATUS_ACTIVE
+ + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, 5, 6, 6 },
+ {CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, -1, 6, 6 },
+ {CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
+ {CAN_CONSUMER, -1, -1, 7, 7 },
+ {CAN_CONSUMER, -1, -1, -1, -1 },
+ {CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 }
};
-
- private static int[][] STATES_PROVIDER = {
- { CAN_PROVIDER, 1, -1, -1 },
- { CAN_PROVIDER + CAN_OWNER + CAN_SET_OUT_MSG + CAN_SET_FAULT_MSG + CAN_SEND + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR + CAN_STATUS_DONE, 2, 3, 4, 4 },
- { CAN_PROVIDER, -1, 5, 6, 6 },
- { CAN_PROVIDER, -1, -1, 6, 6 },
- { CAN_PROVIDER, -1, -1, -1, -1 },
- { CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, -1, 7, 7 },
- { CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
- { CAN_PROVIDER, -1, -1, -1, -1 },
+
+ private static final int[][] STATES_PROVIDER = {
+ {CAN_PROVIDER, 1, -1, -1 },
+ {CAN_PROVIDER + CAN_OWNER + CAN_SET_OUT_MSG + CAN_SET_FAULT_MSG + CAN_SEND
+ + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR + CAN_STATUS_DONE, 2, 3, 4, 4 },
+ {CAN_PROVIDER, -1, 5, 6, 6 },
+ {CAN_PROVIDER, -1, -1, 6, 6 },
+ {CAN_PROVIDER, -1, -1, -1, -1 },
+ {CAN_PROVIDER + CAN_OWNER + CAN_SEND + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, -1, 7, 7 },
+ {CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
+ {CAN_PROVIDER, -1, -1, -1, -1 }
};
public InOptionalOutImpl() {
}
-
+
public InOptionalOutImpl(String exchangeId) {
super(exchangeId, MessageExchangeSupport.IN_OPTIONAL_OUT, STATES_CONSUMER);
this.mirror = new InOptionalOutImpl(this);
}
-
+
public InOptionalOutImpl(ExchangePacket packet) {
super(packet, STATES_CONSUMER);
this.mirror = new InOptionalOutImpl(this);
}
-
+
protected InOptionalOutImpl(InOptionalOutImpl mep) {
super(mep.packet, STATES_PROVIDER);
this.mirror = mep;
}
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.packet = new ExchangePacket();
this.packet.readExternal(in);
if (this.packet.in != null) {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOutImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOutImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOutImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/InOutImpl.java Fri Aug 10 07:37:46 2007
@@ -16,55 +16,56 @@
*/
package org.apache.servicemix.jbi.messaging;
-import javax.jbi.messaging.InOut;
-
import java.io.IOException;
import java.io.ObjectInput;
+import javax.jbi.messaging.InOut;
+
/**
* InOut message exchange.
- *
+ *
* @version $Revision$
*/
public class InOutImpl extends MessageExchangeImpl implements InOut {
-
+
private static final long serialVersionUID = -1639492357707831113L;
- private static int[][] STATES_CONSUMER = {
- { CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1},
- { CAN_CONSUMER, 2, 2, 3, 3 },
- { CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, -1, 4, 4},
- { CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
- { CAN_CONSUMER, -1, -1, -1, -1 },
+ private static final int[][] STATES_CONSUMER = {
+ {CAN_CONSUMER + CAN_OWNER + CAN_SET_IN_MSG + CAN_SEND + CAN_STATUS_ACTIVE, 1, -1, -1, -1 },
+ {CAN_CONSUMER, 2, 2, 3, 3 },
+ {CAN_CONSUMER + CAN_OWNER + CAN_SEND + CAN_STATUS_ERROR + CAN_STATUS_DONE, -1, -1, 4, 4 },
+ {CAN_CONSUMER + CAN_OWNER, -1, -1, -1, -1 },
+ {CAN_CONSUMER, -1, -1, -1, -1 }
};
-
- private static int[][] STATES_PROVIDER = {
- { CAN_PROVIDER, 1, -1, -1 },
- { CAN_PROVIDER + CAN_OWNER + CAN_SET_OUT_MSG + CAN_SET_FAULT_MSG + CAN_SEND + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR, 2, 2, 3, -1 },
- { CAN_PROVIDER, -1, -1, 4, 4 },
- { CAN_PROVIDER, -1, -1, -1, -1 },
- { CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 },
+
+ private static final int[][] STATES_PROVIDER = {
+ {CAN_PROVIDER, 1, -1, -1 },
+ {CAN_PROVIDER + CAN_OWNER + CAN_SET_OUT_MSG + CAN_SET_FAULT_MSG + CAN_SEND
+ + CAN_STATUS_ACTIVE + CAN_STATUS_ERROR, 2, 2, 3, -1 },
+ {CAN_PROVIDER, -1, -1, 4, 4 },
+ {CAN_PROVIDER, -1, -1, -1, -1 },
+ {CAN_PROVIDER + CAN_OWNER, -1, -1, -1, -1 }
};
-
+
public InOutImpl() {
}
-
+
public InOutImpl(String exchangeId) {
super(exchangeId, MessageExchangeSupport.IN_OUT, STATES_CONSUMER);
this.mirror = new InOutImpl(this);
}
-
+
public InOutImpl(ExchangePacket packet) {
super(packet, STATES_CONSUMER);
this.mirror = new InOutImpl(this);
}
-
+
protected InOutImpl(InOutImpl mep) {
super(mep.packet, STATES_PROVIDER);
this.mirror = mep;
}
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.packet = new ExchangePacket();
this.packet.readExternal(in);
if (this.packet.in != null) {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java Fri Aug 10 07:37:46 2007
@@ -16,13 +16,11 @@
*/
package org.apache.servicemix.jbi.messaging;
+import java.net.URI;
import java.text.SimpleDateFormat;
+import java.util.GregorianCalendar;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.id.IdGenerator;
-import org.apache.servicemix.jbi.framework.ComponentContextImpl;
-
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOptionalOut;
import javax.jbi.messaging.InOut;
@@ -33,33 +31,41 @@
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
-import java.net.URI;
-import java.util.GregorianCalendar;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.id.IdGenerator;
+import org.apache.servicemix.jbi.framework.ComponentContextImpl;
/**
* Resolver for URI patterns
- *
+ *
* @version $Revision$
*/
public class MessageExchangeFactoryImpl implements MessageExchangeFactory {
private QName interfaceName;
+
private QName serviceName;
+
private QName operationName;
+
private ServiceEndpoint endpoint;
+
private IdGenerator idGenerator;
+
private ComponentContextImpl context;
+
private AtomicBoolean closed;
/**
* Constructor for a factory
+ *
* @param idGen
*/
- public MessageExchangeFactoryImpl(IdGenerator idGen, AtomicBoolean closed){
+ public MessageExchangeFactoryImpl(IdGenerator idGen, AtomicBoolean closed) {
this.idGenerator = idGen;
this.closed = closed;
}
-
+
protected void checkNotClosed() throws MessagingException {
if (closed.get()) {
throw new MessagingException("DeliveryChannel has been closed.");
@@ -68,7 +74,7 @@
/**
* Create an exchange from the specified pattern
- *
+ *
* @param pattern
* @return MessageExchange
* @throws MessagingException
@@ -77,20 +83,15 @@
checkNotClosed();
MessageExchange result = null;
if (pattern != null) {
- if (pattern.equals(MessageExchangeSupport.IN_ONLY) ||
- pattern.equals(MessageExchangeSupport.WSDL2_IN_ONLY)) {
+ if (pattern.equals(MessageExchangeSupport.IN_ONLY) || pattern.equals(MessageExchangeSupport.WSDL2_IN_ONLY)) {
result = createInOnlyExchange();
- }
- else if (pattern.equals(MessageExchangeSupport.IN_OUT) ||
- pattern.equals(MessageExchangeSupport.WSDL2_IN_OUT)) {
+ } else if (pattern.equals(MessageExchangeSupport.IN_OUT) || pattern.equals(MessageExchangeSupport.WSDL2_IN_OUT)) {
result = createInOutExchange();
- }
- else if (pattern.equals(MessageExchangeSupport.IN_OPTIONAL_OUT) ||
- pattern.equals(MessageExchangeSupport.WSDL2_IN_OPTIONAL_OUT)) {
+ } else if (pattern.equals(MessageExchangeSupport.IN_OPTIONAL_OUT)
+ || pattern.equals(MessageExchangeSupport.WSDL2_IN_OPTIONAL_OUT)) {
result = createInOptionalOutExchange();
- }
- else if (pattern.equals(MessageExchangeSupport.ROBUST_IN_ONLY) ||
- pattern.equals(MessageExchangeSupport.WSDL2_ROBUST_IN_ONLY)) {
+ } else if (pattern.equals(MessageExchangeSupport.ROBUST_IN_ONLY)
+ || pattern.equals(MessageExchangeSupport.WSDL2_ROBUST_IN_ONLY)) {
result = createRobustInOnlyExchange();
}
}
@@ -102,33 +103,33 @@
/**
* create InOnly exchange
- *
+ *
* @return InOnly exchange
* @throws MessagingException
*/
public InOnly createInOnlyExchange() throws MessagingException {
checkNotClosed();
- InOnlyImpl result = new InOnlyImpl(getExchangeId());
+ InOnlyImpl result = new InOnlyImpl(getExchangeId());
setDefaults(result);
return result;
}
/**
* create RobustInOnly exchange
- *
+ *
* @return RobsutInOnly exchange
* @throws MessagingException
*/
public RobustInOnly createRobustInOnlyExchange() throws MessagingException {
checkNotClosed();
- RobustInOnlyImpl result = new RobustInOnlyImpl(getExchangeId());
+ RobustInOnlyImpl result = new RobustInOnlyImpl(getExchangeId());
setDefaults(result);
return result;
}
/**
* create InOut Exchange
- *
+ *
* @return InOut exchange
* @throws MessagingException
*/
@@ -141,86 +142,91 @@
/**
* create InOptionalOut exchange
- *
+ *
* @return InOptionalOut exchange
* @throws MessagingException
*/
public InOptionalOut createInOptionalOutExchange() throws MessagingException {
checkNotClosed();
- InOptionalOutImpl result = new InOptionalOutImpl(getExchangeId());
+ InOptionalOutImpl result = new InOptionalOutImpl(getExchangeId());
setDefaults(result);
return result;
}
/**
- * Create an exchange that points at an endpoint that conforms to the declared capabilities, requirements, and
- * policies of both the consumer and the provider.
- *
- * @param serviceName
- * @param operationName the WSDL name of the operation to be performed
- * @return a message exchange that is initialized with given interfaceName, operationName, and the endpoint decided
- * upon by JBI.
+ * Create an exchange that points at an endpoint that conforms to the
+ * declared capabilities, requirements, and policies of both the consumer
+ * and the provider.
+ *
+ * @param svcName
+ * @param opName
+ * the WSDL name of the operation to be performed
+ * @return a message exchange that is initialized with given interfaceName,
+ * operationName, and the endpoint decided upon by JBI.
* @throws MessagingException
*/
- public MessageExchange createExchange(QName serviceName, QName operationName) throws MessagingException {
+ public MessageExchange createExchange(QName svcName, QName opName) throws MessagingException {
// TODO: look for the operation in the wsdl and infer the MEP
checkNotClosed();
- InOptionalOutImpl me = new InOptionalOutImpl(getExchangeId());
+ InOptionalOutImpl me = new InOptionalOutImpl(getExchangeId());
setDefaults(me);
- me.setService(serviceName);
- me.setOperation(operationName);
+ me.setService(svcName);
+ me.setOperation(opName);
return me;
}
protected String getExchangeId() {
return idGenerator.generateId();
}
-
+
/**
* @return endpoint
*/
public ServiceEndpoint getEndpoint() {
return endpoint;
}
-
+
/**
* set endpoint
+ *
* @param endpoint
*/
public void setEndpoint(ServiceEndpoint endpoint) {
this.endpoint = endpoint;
}
-
+
/**
* @return interface name
*/
public QName getInterfaceName() {
return interfaceName;
}
-
+
/**
* set interface name
+ *
* @param interfaceName
*/
public void setInterfaceName(QName interfaceName) {
this.interfaceName = interfaceName;
}
-
+
/**
* @return service name
*/
public QName getServiceName() {
return serviceName;
}
-
+
/**
* set service name
+ *
* @param serviceName
*/
public void setServiceName(QName serviceName) {
this.serviceName = serviceName;
}
-
+
/**
* @return Returns the operationName.
*/
@@ -228,16 +234,17 @@
return operationName;
}
-
/**
- * @param operationName The operationName to set.
+ * @param operationName
+ * The operationName to set.
*/
public void setOperationName(QName operationName) {
this.operationName = operationName;
}
/**
- * Get the Context
+ * Get the Context
+ *
* @return the context
*/
public ComponentContextImpl getContext() {
@@ -246,6 +253,7 @@
/**
* Set the Context
+ *
* @param context
*/
public void setContext(ComponentContextImpl context) {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeImpl.java Fri Aug 10 07:37:46 2007
@@ -16,14 +16,13 @@
*/
package org.apache.servicemix.jbi.messaging;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.JbiConstants;
-import org.apache.servicemix.jbi.container.ActivationSpec;
-import org.apache.servicemix.jbi.framework.ComponentContextImpl;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.jaxp.SourceTransformer;
-import org.w3c.dom.Node;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URI;
+import java.util.Comparator;
+import java.util.Set;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.Fault;
@@ -35,82 +34,117 @@
import javax.xml.namespace.QName;
import javax.xml.transform.dom.DOMSource;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URI;
-import java.util.Comparator;
-import java.util.Set;
+import org.w3c.dom.Node;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.framework.ComponentContextImpl;
+import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
/**
- * A simple message exchange declaration. This is partial, just giving us enough ME function for the doodle. This
- * doesn't add anything new to the current MessageExchange definition.
- *
+ * A simple message exchange declaration. This is partial, just giving us enough
+ * ME function for the doodle. This doesn't add anything new to the current
+ * MessageExchange definition.
+ *
* @version $Revision$
*/
public abstract class MessageExchangeImpl implements MessageExchange, Externalizable {
+ public static final String IN = "in";
+
+ public static final String OUT = "out";
+
+ public static final String FAULT = "fault";
+
+ public static final int MAX_MSG_DISPLAY_SIZE = 1500;
+
+ public static final boolean PRESERVE_CONTENT = Boolean.getBoolean("org.apache.servicemix.preserveContent");
+
public static final int SYNC_STATE_ASYNC = 0;
+
public static final int SYNC_STATE_SYNC_SENT = 1;
+
public static final int SYNC_STATE_SYNC_RECEIVED = 2;
-
+
/**
- * Exchange is not transactional
+ * Exchange is not transactional
*/
public static final int TX_STATE_NONE = 0;
+
/**
- * Exchange has been enlisted in the current transaction.
- * This means that the transaction must be commited for
- * the exchange to be delivered.
+ * Exchange has been enlisted in the current transaction. This means that
+ * the transaction must be commited for the exchange to be delivered.
*/
public static final int TX_STATE_ENLISTED = 1;
+
/**
- * Transaction is being conveyed by the exchange.
- * The transaction context will be given to the
- * target component.
+ * Transaction is being conveyed by the exchange. The transaction context
+ * will be given to the target component.
*/
public static final int TX_STATE_CONVEYED = 2;
- protected static final int CAN_SET_IN_MSG = 0x00000001;
- protected static final int CAN_SET_OUT_MSG = 0x00000002;
- protected static final int CAN_SET_FAULT_MSG = 0x00000004;
- protected static final int CAN_PROVIDER = 0x00000008;
- protected static final int CAN_CONSUMER = 0x00000000;
- protected static final int CAN_SEND = 0x00000010;
- protected static final int CAN_STATUS_ACTIVE = 0x00000040;
- protected static final int CAN_STATUS_DONE = 0x00000080;
- protected static final int CAN_STATUS_ERROR = 0x00000100;
- protected static final int CAN_OWNER = 0x00000200;
+ protected static final int CAN_SET_IN_MSG = 0x00000001;
+
+ protected static final int CAN_SET_OUT_MSG = 0x00000002;
+
+ protected static final int CAN_SET_FAULT_MSG = 0x00000004;
+
+ protected static final int CAN_PROVIDER = 0x00000008;
+
+ protected static final int CAN_CONSUMER = 0x00000000;
+
+ protected static final int CAN_SEND = 0x00000010;
+
+ protected static final int CAN_STATUS_ACTIVE = 0x00000040;
+
+ protected static final int CAN_STATUS_DONE = 0x00000080;
+
+ protected static final int CAN_STATUS_ERROR = 0x00000100;
+
+ protected static final int CAN_OWNER = 0x00000200;
+
+ protected static final int STATES_CANS = 0;
+
+ protected static final int STATES_NEXT_OUT = 1;
- protected static final int STATES_CANS = 0;
- protected static final int STATES_NEXT_OUT = 1;
protected static final int STATES_NEXT_FAULT = 2;
+
protected static final int STATES_NEXT_ERROR = 3;
- protected static final int STATES_NEXT_DONE = 4;
-
- public static final String FAULT = "fault";
- public static final String IN = "in";
- public static final String OUT = "out";
-
+
+ protected static final int STATES_NEXT_DONE = 4;
+
private static final long serialVersionUID = -3639175136897005605L;
-
- private static final Log log = LogFactory.getLog(MessageExchangeImpl.class);
-
+
+ private static final Log LOG = LogFactory.getLog(MessageExchangeImpl.class);
+
protected ComponentContextImpl sourceContext;
+
protected ExchangePacket packet;
+
protected PojoMarshaler marshaler;
+
protected int state;
+
protected int syncState = SYNC_STATE_ASYNC;
+
protected int txState = TX_STATE_NONE;
+
protected int[][] states;
+
protected MessageExchangeImpl mirror;
+
protected transient boolean pushDeliver;
+
protected transient Object txLock;
+
protected transient String key;
/**
* Constructor
+ *
* @param exchangeId
* @param pattern
*/
@@ -120,15 +154,15 @@
this.packet.setExchangeId(exchangeId);
this.packet.setPattern(pattern);
}
-
+
protected MessageExchangeImpl(ExchangePacket packet, int[][] states) {
this.states = states;
this.packet = packet;
}
-
+
protected MessageExchangeImpl() {
}
-
+
protected void copyFrom(MessageExchangeImpl me) {
if (this != me) {
this.packet = me.packet;
@@ -137,13 +171,15 @@
this.mirror.state = me.mirror.state;
}
}
-
+
protected boolean can(int c) {
return (this.states[state][STATES_CANS] & c) == c;
}
/**
- * Returns the activation spec that was provided when the component was registered
+ * Returns the activation spec that was provided when the component was
+ * registered
+ *
* @return the spec
*/
public ActivationSpec getActivationSpec() {
@@ -154,7 +190,9 @@
}
/**
- * Returns the context which created the message exchange which can then be used for routing
+ * Returns the context which created the message exchange which can then be
+ * used for routing
+ *
* @return the context
*/
public ComponentContextImpl getSourceContext() {
@@ -163,6 +201,7 @@
/**
* Set the context
+ *
* @param sourceContext
*/
public void setSourceContext(ComponentContextImpl sourceContext) {
@@ -173,10 +212,10 @@
/**
* @return the packet
*/
- public ExchangePacket getPacket(){
+ public ExchangePacket getPacket() {
return packet;
}
-
+
/**
* @return URI of pattern exchange
*/
@@ -203,7 +242,7 @@
/**
* set the processing status
- *
+ *
* @param exchangeStatus
* @throws MessagingException
*/
@@ -217,7 +256,7 @@
/**
* set the source of a failure
- *
+ *
* @param exception
*/
public void setError(Exception exception) {
@@ -243,7 +282,7 @@
/**
* set the fault message for the exchange
- *
+ *
* @param fault
* @throws MessagingException
*/
@@ -261,7 +300,7 @@
/**
* factory method for fault objects
- *
+ *
* @return a new fault
* @throws MessagingException
*/
@@ -271,7 +310,7 @@
/**
* get a NormalizedMessage based on the message reference
- *
+ *
* @param name
* @return a NormalizedMessage
*/
@@ -289,7 +328,7 @@
/**
* set a NormalizedMessage with a named reference
- *
+ *
* @param message
* @param name
* @throws MessagingException
@@ -344,18 +383,18 @@
* @return the property from the exchange
*/
public Object getProperty(String name) {
- if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) {
- return packet.getTransactionContext();
- } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) {
- return packet.getPersistent();
- } else {
- return packet.getProperty(name);
- }
+ if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) {
+ return packet.getTransactionContext();
+ } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) {
+ return packet.getPersistent();
+ } else {
+ return packet.getProperty(name);
+ }
}
/**
* set a named property on the exchange
- *
+ *
* @param name
* @param value
*/
@@ -366,25 +405,25 @@
if (name == null) {
throw new IllegalArgumentException("name should not be null");
}
- if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) {
- packet.setTransactionContext((Transaction) value);
- } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) {
- packet.setPersistent((Boolean) value);
- } else {
- packet.setProperty(name, value);
- }
+ if (JTA_TRANSACTION_PROPERTY_NAME.equals(name)) {
+ packet.setTransactionContext((Transaction) value);
+ } else if (JbiConstants.PERSISTENT_PROPERTY_NAME.equals(name)) {
+ packet.setPersistent((Boolean) value);
+ } else {
+ packet.setProperty(name, value);
+ }
}
-
+
/**
* @return property names
*/
- public Set getPropertyNames(){
+ public Set getPropertyNames() {
return packet.getPropertyNames();
}
/**
* Set an endpoint
- *
+ *
* @param endpoint
*/
public void setEndpoint(ServiceEndpoint endpoint) {
@@ -393,7 +432,7 @@
/**
* set a service
- *
+ *
* @param name
*/
public void setService(QName name) {
@@ -402,16 +441,16 @@
/**
* set an operation
- *
+ *
* @param name
*/
public void setOperation(QName name) {
packet.setOperationName(name);
}
-
+
/**
* set an interface
- *
+ *
* @param name
*/
public void setInterfaceName(QName name) {
@@ -431,7 +470,7 @@
public QName getService() {
return packet.getServiceName();
}
-
+
/**
* @return the interface name
*/
@@ -455,7 +494,7 @@
/**
* set the transaction
- *
+ *
* @param transaction
* @throws MessagingException
*/
@@ -480,72 +519,75 @@
/**
* @return the in message
*/
- public NormalizedMessage getInMessage() {
+ public NormalizedMessage getInMessage() {
return this.packet.getIn();
}
/**
* set the in message
- *
+ *
* @param message
* @throws MessagingException
*/
- public void setInMessage(NormalizedMessage message) throws MessagingException {
+ public void setInMessage(NormalizedMessage message) throws MessagingException {
setMessage(message, IN);
}
/**
* @return the out message
*/
- public NormalizedMessage getOutMessage() {
+ public NormalizedMessage getOutMessage() {
return getMessage(OUT);
}
/**
* set the out message
- *
+ *
* @param message
* @throws MessagingException
*/
- public void setOutMessage(NormalizedMessage message) throws MessagingException {
+ public void setOutMessage(NormalizedMessage message) throws MessagingException {
setMessage(message, OUT);
}
-
+
/**
* @return Returns the sourceId.
*/
public ComponentNameSpace getSourceId() {
return packet.getSourceId();
}
+
/**
- * @param sourceId The sourceId to set.
+ * @param sourceId
+ * The sourceId to set.
*/
public void setSourceId(ComponentNameSpace sourceId) {
packet.setSourceId(sourceId);
}
-
+
/**
* @return Returns the destinationId.
*/
public ComponentNameSpace getDestinationId() {
return packet.getDestinationId();
}
+
/**
- * @param destinationId The destinationId to set.
+ * @param destinationId
+ * The destinationId to set.
*/
public void setDestinationId(ComponentNameSpace destinationId) {
packet.setDestinationId(destinationId);
}
-
+
public Boolean getPersistent() {
- return packet.getPersistent();
+ return packet.getPersistent();
}
-
+
public void setPersistent(Boolean persistent) {
- packet.setPersistent(persistent);
+ packet.setPersistent(persistent);
}
-
public PojoMarshaler getMarshaler() {
if (marshaler == null) {
marshaler = new DefaultMarshaler();
@@ -557,15 +599,15 @@
this.marshaler = marshaler;
}
- public abstract void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
-
+ public abstract void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
+
public void writeExternal(ObjectOutput out) throws IOException {
packet.writeExternal(out);
out.write(state);
out.write(mirror.state);
out.writeBoolean(can(CAN_PROVIDER));
}
-
+
public void handleSend(boolean sync) throws MessagingException {
// Check if send / sendSync is legal
if (!can(CAN_SEND)) {
@@ -636,7 +678,7 @@
public void setSyncState(int syncState) {
this.syncState = syncState;
}
-
+
/**
* @return the txState
*/
@@ -645,7 +687,8 @@
}
/**
- * @param txState the txState to set
+ * @param txState
+ * the txState to set
*/
public void setTxState(int txState) {
this.txState = txState;
@@ -654,11 +697,11 @@
public boolean isPushDelivery() {
return this.pushDeliver;
}
-
+
public void setPushDeliver(boolean b) {
this.pushDeliver = true;
}
-
+
/**
* @return the txLock
*/
@@ -667,7 +710,8 @@
}
/**
- * @param txLock the txLock to set
+ * @param txLock
+ * the txLock to set
*/
public void setTxLock(Object txLock) {
this.txLock = txLock;
@@ -695,7 +739,7 @@
if (getOperation() != null) {
sb.append(" operation: ").append(getOperation()).append('\n');
}
- SourceTransformer st = new SourceTransformer();
+ SourceTransformer st = new SourceTransformer();
display("in", sb, st);
display("out", sb, st);
display("fault", sb, st);
@@ -707,28 +751,24 @@
sb.append("]");
return sb.toString();
} catch (Exception e) {
- log.trace("Error caught in toString", e);
+ LOG.trace("Error caught in toString", e);
return super.toString();
}
}
- public static final int maxMsgDisplaySize = 1500;
-
- public static final boolean preserveContent = Boolean.getBoolean("org.apache.servicemix.preserveContent");
-
private void display(String msg, StringBuffer sb, SourceTransformer st) {
if (getMessage(msg) != null) {
sb.append(" ").append(msg).append(": ");
try {
if (getMessage(msg).getContent() != null) {
- if (preserveContent) {
+ if (PRESERVE_CONTENT) {
sb.append(getMessage(msg).getContent().getClass());
} else {
Node node = st.toDOMNode(getMessage(msg).getContent());
getMessage(msg).setContent(new DOMSource(node));
String str = st.toString(node);
- if (str.length() > maxMsgDisplaySize) {
- sb.append(str.substring(0, maxMsgDisplaySize)).append("...");
+ if (str.length() > MAX_MSG_DISPLAY_SIZE) {
+ sb.append(str.substring(0, MAX_MSG_DISPLAY_SIZE)).append("...");
} else {
sb.append(str);
}
@@ -744,11 +784,9 @@
}
/**
- * Compute a unique key for this exchange proxy.
- * It has to be different for the two sides of the exchange, so
- * we include the role + the exchange id.
- * Obviously, it works, because the role never change for
- * a given proxy.
+ * Compute a unique key for this exchange proxy. It has to be different for
+ * the two sides of the exchange, so we include the role + the exchange id.
+ * Obviously, it works, because the role never change for a given proxy.
*
* @return
*/
@@ -760,10 +798,9 @@
}
/**
- * Comparator that can be used to sort exchanges according to their
- * "age" in their processing: i.e.: a newly created exchange will be
- * more than a DONE exchange ...
- * If the arguments are not instances of MessageExchangeImpl,
+ * Comparator that can be used to sort exchanges according to their "age" in
+ * their processing: i.e.: a newly created exchange will be more than a DONE
+ * exchange ... If the arguments are not instances of MessageExchangeImpl,
* returns 0.
*/
public static class AgeComparator implements Comparator<MessageExchangeImpl> {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeSupport.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeSupport.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeSupport.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeSupport.java Fri Aug 10 07:37:46 2007
@@ -20,7 +20,7 @@
/**
* Resolver for URI patterns
- *
+ *
* @version $Revision$
*/
public class MessageExchangeSupport {
@@ -28,64 +28,77 @@
* In Only MEP.
*/
public static final URI IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/in-only");
+
/**
* In Out MEP.
*/
public static final URI IN_OUT = URI.create("http://www.w3.org/2004/08/wsdl/in-out");
+
/**
* In Optional Out MEP.
*/
public static final URI IN_OPTIONAL_OUT = URI.create("http://www.w3.org/2004/08/wsdl/in-opt-out");
+
/**
* Robust In Only MEP.
*/
public static final URI ROBUST_IN_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/robust-in-only");
+
/**
* Out Only MEP.
*/
public static final URI OUT_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/out-only");
+
/**
* Out In MEP.
*/
public static final URI OUT_IN = URI.create("http://www.w3.org/2004/08/wsdl/out-in");
+
/**
* Out Optional In MEP.
*/
public static final URI OUT_OPTIONAL_IN = URI.create("http://www.w3.org/2004/08/wsdl/out-opt-in");
+
/**
* Robust Out Only MEP.
*/
public static final URI ROBUST_OUT_ONLY = URI.create("http://www.w3.org/2004/08/wsdl/robust-out-only");
-
/**
* In Only MEP.
*/
public static final URI WSDL2_IN_ONLY = URI.create("http://www.w3.org/2006/01/wsdl/in-only");
+
/**
* In Out MEP.
*/
public static final URI WSDL2_IN_OUT = URI.create("http://www.w3.org/2006/01/wsdl/in-out");
+
/**
* In Optional Out MEP.
*/
public static final URI WSDL2_IN_OPTIONAL_OUT = URI.create("http://www.w3.org/2006/01/wsdl/in-opt-out");
+
/**
* Robust In Only MEP.
*/
public static final URI WSDL2_ROBUST_IN_ONLY = URI.create("http://www.w3.org/2006/01/wsdl/robust-in-only");
+
/**
* Out Only MEP.
*/
public static final URI WSDL2_OUT_ONLY = URI.create("http://www.w3.org/2006/01/wsdl/out-only");
+
/**
* Out In MEP.
*/
public static final URI WSDL2_OUT_IN = URI.create("http://www.w3.org/2006/01/wsdl/out-in");
+
/**
* Out Optional In MEP.
*/
public static final URI WSDL2_OUT_OPTIONAL_IN = URI.create("http://www.w3.org/2006/01/wsdl/out-opt-in");
+
/**
* Robust Out Only MEP.
*/
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/NormalizedMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/NormalizedMessageImpl.java?view=diff&rev=564607&r1=564606&r2=564607
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/NormalizedMessageImpl.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/NormalizedMessageImpl.java Fri Aug 10 07:37:46 2007
@@ -50,41 +50,43 @@
/**
* Represents a JBI NormalizedMessage.
- *
+ *
* @version $Revision$
*/
public class NormalizedMessageImpl implements NormalizedMessage, Externalizable, Message {
-
+
private static final long serialVersionUID = 9179194301410526549L;
-
+
+ private static final SourceTransformer TRANSFORMER = new SourceTransformer();
+
protected transient MessageExchangeImpl exchange;
+
private transient Source content;
+
private transient Object body;
+
private Subject securitySubject;
+
private Map properties;
- private Map attachments;
- private static SourceTransformer transformer = new SourceTransformer();
+ private Map attachments;
/**
* Constructor
- *
+ *
*/
public NormalizedMessageImpl() {
}
-
/**
* Constructor
+ *
* @param exchange
*/
public NormalizedMessageImpl(MessageExchangeImpl exchange) {
this.exchange = exchange;
}
-
-
-
/**
* @return the content of the message
*/
@@ -92,8 +94,7 @@
if (content == null && body != null) {
try {
getMarshaler().marshal(exchange, this, body);
- }
- catch (MessagingException e) {
+ } catch (MessagingException e) {
throw new RuntimeJBIException(e);
}
}
@@ -102,7 +103,7 @@
/**
* set the content fo the message
- *
+ *
* @param source
*/
public void setContent(Source source) {
@@ -118,7 +119,7 @@
/**
* set the security subject
- *
+ *
* @param securitySubject
*/
public void setSecuritySubject(Subject securitySubject) {
@@ -127,7 +128,7 @@
/**
* get a named property
- *
+ *
* @param name
* @return a property from the message
*/
@@ -150,7 +151,7 @@
/**
* set a property
- *
+ *
* @param name
* @param value
*/
@@ -166,17 +167,17 @@
/**
* Add an attachment
- *
+ *
* @param id
- * @param content
+ * @param handler
*/
- public void addAttachment(String id, DataHandler content) {
- getAttachments().put(id, content.getDataSource());
+ public void addAttachment(String id, DataHandler handler) {
+ getAttachments().put(id, handler.getDataSource());
}
/**
* Get a named attachement
- *
+ *
* @param id
* @return the specified attachment
*/
@@ -199,7 +200,7 @@
/**
* remove an identified attachment
- *
+ *
* @param id
*/
public void removeAttachment(String id) {
@@ -207,25 +208,26 @@
attachments.remove(id);
}
}
-
- /** Returns a list of identifiers for each attachment to the message.
- * @return iterator over String attachment identifiers
+
+ /**
+ * Returns a list of identifiers for each attachment to the message.
+ *
+ * @return iterator over String attachment identifiers
*/
- public Set getAttachmentNames(){
- if (attachments != null){
+ public Set getAttachmentNames() {
+ if (attachments != null) {
return Collections.unmodifiableSet(attachments.keySet());
}
return Collections.EMPTY_SET;
}
-
public String toString() {
return super.toString() + "{properties: " + getProperties() + "}";
}
-
+
// Scripting helper methods to add expressive power
// when using languages like Groovy, Velocity etc
- //-------------------------------------------------------------------------
+ // -------------------------------------------------------------------------
public Object getBody() throws MessagingException {
if (body == null) {
@@ -243,7 +245,7 @@
}
public String getBodyText() throws TransformerException {
- return transformer.toString(getContent());
+ return TRANSFORMER.toString(getContent());
}
public void setBodyText(String xml) {
@@ -262,9 +264,8 @@
return getExchange().createFault();
}
-
// Implementation methods
- //-------------------------------------------------------------------------
+ // -------------------------------------------------------------------------
protected Map getProperties() {
if (properties == null) {
properties = createPropertiesMap();
@@ -299,6 +300,7 @@
/**
* Write to a Stream
+ *
* @param out
* @throws IOException
*/
@@ -307,16 +309,13 @@
convertAttachments();
out.writeObject(attachments);
out.writeObject(properties);
- String src = transformer.toString(content);
+ String src = TRANSFORMER.toString(content);
out.writeObject(src);
// We have read the source
// so now, ensure that it can be re-read
- if ((content instanceof StreamSource ||
- content instanceof SAXSource) &&
- !(content instanceof StringSource) &&
- !(content instanceof BytesSource) &&
- !(content instanceof ResourceSource)) {
- content = new StringSource(src);
+ if ((content instanceof StreamSource || content instanceof SAXSource) && !(content instanceof StringSource)
+ && !(content instanceof BytesSource) && !(content instanceof ResourceSource)) {
+ content = new StringSource(src);
}
} catch (TransformerException e) {
throw (IOException) new IOException("Could not transform content to string").initCause(e);
@@ -360,4 +359,3 @@
}
}
-