You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/20 13:08:51 UTC
svn commit: r756427 - in /servicemix/smx4/nmr/trunk/jbi/cluster:
engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/
requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/
Author: gnodet
Date: Fri Mar 20 12:08:51 2009
New Revision: 756427
URL: http://svn.apache.org/viewvc?rev=756427&view=rev
Log:
SMX4NMR-128: add some synchronizations around the requestor to avoid it being used concurrently by multiple threads
Modified:
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java?rev=756427&r1=756426&r2=756427&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java Fri Mar 20 12:08:51 2009
@@ -554,23 +554,31 @@
//-------------------------------------------------------------------------
public void process(Exchange exchange) {
- JmsRequestor item = null;
try {
String corrId = (String) exchange.getProperty(PROPERTY_CORR_ID + "." + name);
if (corrId != null) {
- item = pool.resume(corrId);
+ JmsRequestor item = pool.resume(corrId);
+ synchronized (item) {
+ try {
+ processExchange(item, exchange);
+ } finally {
+ item.close();
+ }
+ }
} else {
- item = pool.newRequestor();
- item.begin();
+ JmsRequestor item = pool.newRequestor();
+ synchronized (item) {
+ try {
+ item.begin();
+ processExchange(item, exchange);
+ } finally {
+ item.close();
+ }
+ }
}
- processExchange(item, exchange);
} catch (Exception e) {
// TODO what id the problem is a JMS exception or related
fail(exchange, e);
- } finally {
- if (item != null) {
- item.close();
- }
}
}
@@ -705,171 +713,173 @@
* @throws Exception if an error occur
*/
protected void processExchange(JmsRequestor requestor, Exchange exchange) throws Exception {
- decrementPendingExchangeIfNeeded(exchange);
- boolean rollbackOnErrors;
- if (exchange.getRole() == Role.Consumer) {
- rollbackOnErrors = Boolean.TRUE.equals(exchange.getProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name));
- } else {
- rollbackOnErrors = this.rollbackOnErrors;
- }
- if (exchange.getStatus() == Status.Active) {
- Message msg = exchange.getFault(false);
- int type;
- if (msg != null) {
- type = JBI_MESSAGE_FAULT;
+ synchronized (requestor) {
+ decrementPendingExchangeIfNeeded(exchange);
+ boolean rollbackOnErrors;
+ if (exchange.getRole() == Role.Consumer) {
+ rollbackOnErrors = Boolean.TRUE.equals(exchange.getProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name));
} else {
- msg = exchange.getOut(false);
+ rollbackOnErrors = this.rollbackOnErrors;
+ }
+ if (exchange.getStatus() == Status.Active) {
+ Message msg = exchange.getFault(false);
+ int type;
if (msg != null) {
- type = JBI_MESSAGE_OUT;
+ type = JBI_MESSAGE_FAULT;
} else {
- msg = exchange.getIn(false);
+ msg = exchange.getOut(false);
if (msg != null) {
- type = JBI_MESSAGE_IN;
+ type = JBI_MESSAGE_OUT;
} else {
- throw new IllegalStateException("No normalized message on an active exchange: " + exchange);
+ msg = exchange.getIn(false);
+ if (msg != null) {
+ type = JBI_MESSAGE_IN;
+ } else {
+ throw new IllegalStateException("No normalized message on an active exchange: " + exchange);
+ }
}
}
- }
- javax.jms.Message message = requestor.getSession().createObjectMessage(msg);
- message.setIntProperty(JBI_MESSAGE, type);
- if (type == JBI_MESSAGE_IN) {
- rollbackOnErrors = this.rollbackOnErrors;
- exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, rollbackOnErrors);
- message.setStringProperty(JBI_MEP, exchange.getPattern().getWsdlUri());
- if (exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP) != null) {
- message.setStringProperty(JBI_INTERFACE, exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP).toString());
- }
- if (exchange.getOperation() != null) {
- message.setStringProperty(JBI_OPERATION, exchange.getOperation().toString());
- }
- if (exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP) != null) {
- message.setStringProperty(JBI_SERVICE, exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP).toString());
- }
- if (exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP) != null) {
- ServiceEndpoint se = (ServiceEndpoint) exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP);
- message.setStringProperty(JBI_ENDPOINT, "{" + se.getServiceName().toString() + "}" + se.getEndpointName());
- }
- // TODO: write exchange properties
- }
- message.setBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS, rollbackOnErrors);
- boolean expectResponse;
- if (!rollbackOnErrors) {
- expectResponse = true;
- } else {
- switch (exchange.getPattern()) {
- case InOnly:
- expectResponse = false;
- break;
- case RobustInOnly:
- expectResponse = exchange.getRole() == Role.Provider;
- break;
- case InOut:
- expectResponse = exchange.getRole() == Role.Provider;
- break;
- default:
- // TODO:
- expectResponse = true;
- break;
- }
- }
- if (expectResponse) {
- exchanges.put(exchange.getId(), exchange);
- message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
- message.setStringProperty(PROPERTY_SENDER_CORR_ID, exchange.getId());
- if (requestor.getMessage() != null) {
- message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
- message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ javax.jms.Message message = requestor.getSession().createObjectMessage(msg);
+ message.setIntProperty(JBI_MESSAGE, type);
+ if (type == JBI_MESSAGE_IN) {
+ rollbackOnErrors = this.rollbackOnErrors;
+ exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, rollbackOnErrors);
+ message.setStringProperty(JBI_MEP, exchange.getPattern().getWsdlUri());
+ if (exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP) != null) {
+ message.setStringProperty(JBI_INTERFACE, exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP).toString());
+ }
+ if (exchange.getOperation() != null) {
+ message.setStringProperty(JBI_OPERATION, exchange.getOperation().toString());
+ }
+ if (exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP) != null) {
+ message.setStringProperty(JBI_SERVICE, exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP).toString());
+ }
+ if (exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP) != null) {
+ ServiceEndpoint se = (ServiceEndpoint) exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP);
+ message.setStringProperty(JBI_ENDPOINT, "{" + se.getServiceName().toString() + "}" + se.getEndpointName());
+ }
+ // TODO: write exchange properties
}
- requestor.send(message);
- } else {
- message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
- message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
- if (requestor.getMessage() != null) {
- message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
- message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ message.setBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS, rollbackOnErrors);
+ boolean expectResponse;
+ if (!rollbackOnErrors) {
+ expectResponse = true;
+ } else {
+ switch (exchange.getPattern()) {
+ case InOnly:
+ expectResponse = false;
+ break;
+ case RobustInOnly:
+ expectResponse = exchange.getRole() == Role.Provider;
+ break;
+ case InOut:
+ expectResponse = exchange.getRole() == Role.Provider;
+ break;
+ default:
+ // TODO:
+ expectResponse = true;
+ break;
+ }
}
- requestor.send(message);
- // TODO: send done in the tx synchronization
- done(exchange);
- }
- } else if (exchange.getStatus() == Status.Done) {
- boolean doSend;
- if (!rollbackOnErrors) {
- doSend = true;
- } else {
- switch (exchange.getPattern()) {
- case InOnly:
- // never send done for InOnly
- doSend = false;
- break;
- case RobustInOnly:
- // only send done when there is no fault
- // which means the exchange has a consumer role
- doSend = exchange.getRole() == Role.Consumer;
- break;
- case InOptionalOut:
- // TODO
- doSend = true;
- break;
- case InOut:
- // in an InOut mep, the DONE status always come from the JBI consumer
- doSend = false;
- break;
- default:
- throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+ if (expectResponse) {
+ exchanges.put(exchange.getId(), exchange);
+ message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+ message.setStringProperty(PROPERTY_SENDER_CORR_ID, exchange.getId());
+ if (requestor.getMessage() != null) {
+ message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+ message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ }
+ requestor.send(message);
+ } else {
+ message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+ message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
+ if (requestor.getMessage() != null) {
+ message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+ message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ }
+ requestor.send(message);
+ // TODO: send done in the tx synchronization
+ done(exchange);
+ }
+ } else if (exchange.getStatus() == Status.Done) {
+ boolean doSend;
+ if (!rollbackOnErrors) {
+ doSend = true;
+ } else {
+ switch (exchange.getPattern()) {
+ case InOnly:
+ // never send done for InOnly
+ doSend = false;
+ break;
+ case RobustInOnly:
+ // only send done when there is no fault
+ // which means the exchange has a consumer role
+ doSend = exchange.getRole() == Role.Consumer;
+ break;
+ case InOptionalOut:
+ // TODO
+ doSend = true;
+ break;
+ case InOut:
+ // in an InOut mep, the DONE status always come from the JBI consumer
+ doSend = false;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+ }
}
- }
- if (doSend) {
- javax.jms.Message message = requestor.getSession().createMessage();
- message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_DONE);
- message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
- message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
- if (requestor.getMessage() != null) {
- message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
- message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ if (doSend) {
+ javax.jms.Message message = requestor.getSession().createMessage();
+ message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_DONE);
+ message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+ message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
+ if (requestor.getMessage() != null) {
+ message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+ message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ }
+ requestor.send(message);
}
- requestor.send(message);
- }
- } else if (exchange.getStatus() == Status.Error) {
- boolean doSend;
- if (!rollbackOnErrors) {
- doSend = true;
- } else {
- switch (exchange.getPattern()) {
- case InOnly:
- // never send errors for InOnly
- doSend = false;
- break;
- case RobustInOnly:
- // do not send exchange from the provider back to the consumer
- doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
- break;
- case InOptionalOut:
- // TODO
- doSend = true;
- break;
- case InOut:
- doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
- break;
- default:
- throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+ } else if (exchange.getStatus() == Status.Error) {
+ boolean doSend;
+ if (!rollbackOnErrors) {
+ doSend = true;
+ } else {
+ switch (exchange.getPattern()) {
+ case InOnly:
+ // never send errors for InOnly
+ doSend = false;
+ break;
+ case RobustInOnly:
+ // do not send exchange from the provider back to the consumer
+ doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
+ break;
+ case InOptionalOut:
+ // TODO
+ doSend = true;
+ break;
+ case InOut:
+ doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+ }
}
- }
- if (doSend) {
- javax.jms.Message message = requestor.getSession().createObjectMessage(exchange.getError());
- message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_ERROR);
- message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
- message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
- if (requestor.getMessage() != null) {
- message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
- message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ if (doSend) {
+ javax.jms.Message message = requestor.getSession().createObjectMessage(exchange.getError());
+ message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_ERROR);
+ message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+ message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
+ if (requestor.getMessage() != null) {
+ message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+ message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+ }
+ requestor.send(message);
+ } else {
+ requestor.setRollbackOnly();
}
- requestor.send(message);
} else {
- requestor.setRollbackOnly();
+ throw new IllegalStateException("Unknown exchange status: " + exchange);
}
- } else {
- throw new IllegalStateException("Unknown exchange status: " + exchange);
}
}
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java?rev=756427&r1=756426&r2=756427&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java Fri Mar 20 12:08:51 2009
@@ -102,7 +102,9 @@
public JmsRequestor resume(String id) {
Requestor requestor = parked.remove(id);
- requestor.resume();
+ synchronized (requestor) {
+ requestor.resume();
+ }
return requestor;
}
@@ -131,7 +133,7 @@
* Retrieve the jms session
* @return the session
*/
- public Session getSession() throws JmsException {
+ public synchronized Session getSession() throws JmsException {
try {
if (session == null) {
Connection conToUse;
@@ -161,7 +163,7 @@
* @return the JMS message
* @throws javax.jms.JMSException if an error occur
*/
- public Message receive(long timeout) throws JMSException {
+ public synchronized Message receive(long timeout) throws JMSException {
if (timeout < 0) {
message = getConsumer().receive();
} else {
@@ -188,7 +190,7 @@
* @param msg the message to send
* @throws javax.jms.JMSException if an error occur
*/
- public void send(Message msg) throws JmsException {
+ public synchronized void send(Message msg) throws JmsException {
if (logger.isDebugEnabled()) {
logger.debug("Sending JMS message: " + msg);
}
@@ -217,7 +219,7 @@
* the item is no longer used. If an item has been parked previously,
* this call will do nothing.
*/
- public void close() {
+ public synchronized void close() {
if (session != null) {
if (!suspended) {
try {
@@ -274,7 +276,7 @@
* This should be called when the same session need to be reused at a later time.
* @param id the parking id
*/
- public void suspend(String id) {
+ public synchronized void suspend(String id) {
if (transacted == Transacted.Xa) {
try {
if (logger.isDebugEnabled()) {
@@ -317,7 +319,7 @@
* Mark this item has not parked anymore so that it can later be
* returned to the pool by a call to {@link #close()}.
*/
- protected void resume() {
+ protected synchronized void resume() {
if (transaction != null) {
try {
if (logger.isDebugEnabled()) {
@@ -340,7 +342,7 @@
* Internal use only.
* Prepare this item to be used again.
*/
- public void begin() throws JmsException {
+ public synchronized void begin() throws JmsException {
startXaTransaction();
getSession();
}
@@ -364,7 +366,7 @@
* This means that the transaction will be rolled back instead of
* being committed.
*/
- public void setRollbackOnly() {
+ public synchronized void setRollbackOnly() {
rollbackOnly = true;
if (transacted == Transacted.Xa) {
try {
Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java?rev=756427&r1=756426&r2=756427&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java Fri Mar 20 12:08:51 2009
@@ -462,29 +462,30 @@
}
protected boolean invokeListener() throws Exception {
- Requestor req = null;
boolean messageReceived = false;
- try {
- req = createRequestor(true);
- req.begin();
- messageReceived = req.receive(receiveTimeout) != null;
- if (messageReceived) {
- if (logger.isDebugEnabled()) {
- logger.debug("Received message of type [" + req.getMessage().getClass() + "] from consumer");
+ Requestor req = createRequestor(true);
+ synchronized (req) {
+ try {
+ req.begin();
+ messageReceived = req.receive(receiveTimeout) != null;
+ if (messageReceived) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received message of type [" + req.getMessage().getClass() + "] from consumer");
+ }
+ messageReceived(this, req.getSession());
+ listener.onMessage(req);
+ lastMessageSucceeded = true;
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Consumer did not receive a message");
+ }
+ noMessageReceived(this, req.getSession());
+ lastMessageSucceeded = true;
}
- messageReceived(this, req.getSession());
- listener.onMessage(req);
- lastMessageSucceeded = true;
- } else {
- if (logger.isTraceEnabled()) {
- logger.trace("Consumer did not receive a message");
+ } finally {
+ if (req != null) {
+ req.close();
}
- noMessageReceived(this, req.getSession());
- lastMessageSucceeded = true;
- }
- } finally {
- if (req != null) {
- req.close();
}
}
return messageReceived;