You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2008/07/31 14:38:37 UTC
svn commit: r681358 - in /webservices/sandesha/trunk/java/modules:
core/src/main/java/org/apache/sandesha2/msgprocessors/
core/src/main/java/org/apache/sandesha2/storage/inmemory/
tests/src/test/java/org/apache/sandesha2/storage/
Author: mckierna
Date: Thu Jul 31 05:38:36 2008
New Revision: 681358
URL: http://svn.apache.org/viewvc?rev=681358&view=rev
Log:
See https://issues.apache.org/jira/browse/SANDESHA2-172, thanks Dave
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Thu Jul 31 05:38:36 2008
@@ -181,12 +181,13 @@
// offered seq id
String offeredSequenceID = offer.getIdentifer().getIdentifier();
- boolean offerAccepted = offerAccepted(offeredSequenceID, context, createSeqRMMsg, storageManager);
-
+ boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
+ boolean offerAccepted = true;
+
RMSBean rMSBean = null;
//Before processing this offer any further we need to perform some extra checks
//on the offered EP if WS-RM Spec 1.1 is being used
- if(offerAccepted && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
+ if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
Endpoint endpoint = offer.getEndpoint();
if (endpoint!=null) {
//Check to see if the offer endpoint has a value of WSA Anonymous
@@ -210,20 +211,28 @@
log.debug("Offer Refused as it included a null endpoint");
offerAccepted = false;
}
- } else if (offerAccepted && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
+ } else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
rMSBean = new RMSBean();
}
- if (offerAccepted) {
+ String outgoingSideInternalSequenceId = SandeshaUtil
+ .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
+ if(isValidseqID){
// Setting the CreateSequence table entry for the outgoing
// side.
- rMSBean.setSequenceID(offeredSequenceID);
- String outgoingSideInternalSequenceId = SandeshaUtil
- .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+ rMSBean.setSequenceID(offeredSequenceID);
rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
// this is a dummy value
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
+ //Try inserting the new RMSBean
+ if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
+ offerAccepted = false;
+ }
+ }
+
+ if (offerAccepted) {
if(rmdBean.getToEndpointReference() != null){
rMSBean.setToEndpointReference(rmdBean.getToEndpointReference());
} else {
@@ -263,7 +272,7 @@
// Set the SOAP Version for this sequence.
rMSBean.setSoapVersion(SandeshaUtil.getSOAPVersion(createSeqRMMsg.getSOAPEnvelope()));
- storageManager.getRMSBeanMgr().insert(rMSBean);
+ storageManager.getRMSBeanMgr().update(rMSBean);
SandeshaUtil.startWorkersForSequence(context, rMSBean);
@@ -371,34 +380,25 @@
return true;
}
- private boolean offerAccepted(String sequenceId, ConfigurationContext configCtx, RMMsgContext createSeqRMMsg,
+ private boolean isValidseqID(String sequenceId, ConfigurationContext configCtx, RMMsgContext createSeqRMMsg,
StorageManager storageManager) throws SandeshaException {
if (log.isDebugEnabled())
- log.debug("Enter: CreateSeqMsgProcessor::offerAccepted, " + sequenceId);
+ log.debug("Enter: CreateSeqMsgProcessor::isValidseqID, " + sequenceId);
if ("".equals(sequenceId)) {
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
- return false;
- }
-
- RMSBean createSeqFindBean = new RMSBean();
- createSeqFindBean.setSequenceID(sequenceId);
- Collection arr = storageManager.getRMSBeanMgr().find(createSeqFindBean);
-
- if (arr.size() > 0) {
- if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+ log.debug("Exit: CreateSeqMsgProcessor::isValidseqID, " + false);
return false;
}
+
if (sequenceId.length() <= 1) {
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + false);
+ log.debug("Exit: CreateSeqMsgProcessor::isValidseqID, " + false);
return false; // Single character offers are NOT accepted.
}
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::offerAccepted, " + true);
+ log.debug("Exit: CreateSeqMsgProcessor::isValidseqID, " + true);
return true;
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Jul 31 05:38:36 2008
@@ -46,6 +46,7 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.Accept;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -132,7 +133,7 @@
"Existing id:" + rmsBean.getSequenceID() + ", new id:" + newOutSequenceId);
return false;
}
-
+
// Store the new sequence id
rmsBean.setSequenceID(newOutSequenceId);
@@ -147,110 +148,119 @@
}
}
}
-
- // Get the CreateSeqBean based on the message id to take a lock on the bean
- SenderBean createSeqBean = retransmitterMgr.retrieve(createSeqMsgId);
-
- // deleting the create sequence sender bean entry.
- retransmitterMgr.delete(createSeqBean.getMessageID());
+
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
- // Remove the create sequence message
- storageManager.removeMessageContext(rmsBean.getCreateSequenceMsgStoreKey());
-
- // processing for accept (offer has been sent)
- Accept accept = createSeqResponsePart.getAccept();
- if (accept != null) {
-
- // TODO this should be detected in the Fault manager.
- if (rmsBean.getOfferedSequence() == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
- log.debug(message);
- throw new SandeshaException(message);
+ if(!rmsBeanMgr.update(rmsBean)){
+ //Im not setting the createSeqBean sender bean to resend true as the reallocation of msgs will do this
+ try{
+ TerminateManager.terminateSendingSide(rmsBean, storageManager, true);
+ } catch(Exception e){
+ if (log.isDebugEnabled())
+ log.debug(e);
}
+ } else {
+ // processing for accept (offer has been sent)
+ Accept accept = createSeqResponsePart.getAccept();
+ if (accept != null) {
+
+ // TODO this should be detected in the Fault manager.
+ if (rmsBean.getOfferedSequence() == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
- RMDBean rMDBean = new RMDBean();
-
- EndpointReference acksToEPR = accept.getAcksTo().getEPR();
- rMDBean.setAcksToEndpointReference(acksToEPR);
- rMDBean.setSequenceID(rmsBean.getOfferedSequence());
- rMDBean.setNextMsgNoToProcess(1);
- rMDBean.setOutboundInternalSequence(rmsBean.getInternalSequenceID());
+ RMDBean rMDBean = new RMDBean();
+
+ EndpointReference acksToEPR = accept.getAcksTo().getEPR();
+ rMDBean.setAcksToEndpointReference(acksToEPR);
+ rMDBean.setSequenceID(rmsBean.getOfferedSequence());
+ rMDBean.setNextMsgNoToProcess(1);
+ rMDBean.setOutboundInternalSequence(rmsBean.getInternalSequenceID());
- rMDBean.setServiceName(createSeqResponseRMMsgCtx.getMessageContext().getAxisService().getName());
-
- //Storing the referenceMessage of the sending side sequence as the reference message
- //of the receiving side as well.
- //This can be used when creating new outgoing messages.
-
- String referenceMsgStoreKey = rmsBean.getReferenceMessageStoreKey();
- MessageContext referenceMsg = storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
-
- String newMessageStoreKey = SandeshaUtil.getUUID();
- storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
-
- rMDBean.setReferenceMessageKey(newMessageStoreKey);
+ rMDBean.setServiceName(createSeqResponseRMMsgCtx.getMessageContext().getAxisService().getName());
+
+ //Storing the referenceMessage of the sending side sequence as the reference message
+ //of the receiving side as well.
+ //This can be used when creating new outgoing messages.
+
+ String referenceMsgStoreKey = rmsBean.getReferenceMessageStoreKey();
+ MessageContext referenceMsg = storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
+
+ String newMessageStoreKey = SandeshaUtil.getUUID();
+ storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
+
+ rMDBean.setReferenceMessageKey(newMessageStoreKey);
- // If this is an offered sequence that needs polling then we need to setup the
- // rmdBean for polling too, so that it still gets serviced after the outbound
- // sequence terminates.
- if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
- if(rmsBean.isPollingMode()) {
- rMDBean.setPollingMode(true);
+ // If this is an offered sequence that needs polling then we need to setup the
+ // rmdBean for polling too, so that it still gets serviced after the outbound
+ // sequence terminates.
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+ if(rmsBean.isPollingMode()) {
+ rMDBean.setPollingMode(true);
+ }
}
- }
-
- String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
- rMDBean.setRMVersion(rmSpecVersion);
-
- EndpointReference toEPR = createSeqResponseRMMsgCtx.getTo();
- if (toEPR==null) {
- //Most probably this is a sync response message, using the replyTo of the request message
- OperationContext operationContext = createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
- if (operationContext!=null) {
- MessageContext createSequnceMessage = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
- if (createSequnceMessage!=null)
- toEPR = createSequnceMessage.getReplyTo();
+
+ String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+ rMDBean.setRMVersion(rmSpecVersion);
+
+ EndpointReference toEPR = createSeqResponseRMMsgCtx.getTo();
+ if (toEPR==null) {
+ //Most probably this is a sync response message, using the replyTo of the request message
+ OperationContext operationContext = createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
+ if (operationContext!=null) {
+ MessageContext createSequnceMessage = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+ if (createSequnceMessage!=null)
+ toEPR = createSequnceMessage.getReplyTo();
+ }
}
- }
-
- if (toEPR!=null)
- rMDBean.setToAddress(toEPR.getAddress());
-
- rMDBean.setServerCompletedMessages(new RangeString());
- RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+
+ if (toEPR!=null)
+ rMDBean.setToAddress(toEPR.getAddress());
+
+ rMDBean.setServerCompletedMessages(new RangeString());
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
- // Store the security token for the offered sequence
- rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
-
- rMDBean.setLastActivatedTime(System.currentTimeMillis());
+ // Store the security token for the offered sequence
+ rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
+
+ rMDBean.setLastActivatedTime(System.currentTimeMillis());
+
+ rmdBeanMgr.insert(rMDBean);
+ SandeshaUtil.startWorkersForSequence(configCtx, rMDBean);
+ }
- rmdBeanMgr.insert(rMDBean);
- SandeshaUtil.startWorkersForSequence(configCtx, rMDBean);
+ // Get the CreateSeqBean based on the message id to take a lock on the bean
+ SenderBean createSeqBean = retransmitterMgr.retrieve(createSeqMsgId);
+
+ // deleting the create sequence sender bean entry.
+ retransmitterMgr.delete(createSeqBean.getMessageID());
+
+ // Remove the create sequence message
+ storageManager.removeMessageContext(rmsBean.getCreateSequenceMsgStoreKey());
+ SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+
+ // Locate and update all of the messages for this sequence, now that we know
+ // the sequence id.
+ SenderBean target = new SenderBean();
+ target.setInternalSequenceID(internalSequenceId);
+ target.setSend(false);
+
+ Iterator iterator = retransmitterMgr.find(target).iterator();
+ while (iterator.hasNext()) {
+ SenderBean tempBean = (SenderBean) iterator.next();
+
+ // asking to send the application msssage
+ tempBean.setSend(true);
+ tempBean.setSequenceID(newOutSequenceId);
+ retransmitterMgr.update(tempBean);
+ }
+
+ // TODO - does this do anything?
+ createSeqResponseRMMsgCtx.pause();
}
- rmsBean.setLastActivatedTime(System.currentTimeMillis());
- rmsBeanMgr.update(rmsBean);
- SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
-
- // Locate and update all of the messages for this sequence, now that we know
- // the sequence id.
- SenderBean target = new SenderBean();
- target.setInternalSequenceID(internalSequenceId);
- target.setSend(false);
-
- Iterator iterator = retransmitterMgr.find(target).iterator();
- while (iterator.hasNext()) {
- SenderBean tempBean = (SenderBean) iterator.next();
-
- // asking to send the application msssage
- tempBean.setSend(true);
- tempBean.setSequenceID(newOutSequenceId);
- retransmitterMgr.update(tempBean);
- }
-
- // TODO - does this do anything?
- createSeqResponseRMMsgCtx.pause();
-
if (log.isDebugEnabled())
log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
return true;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java Thu Jul 31 05:38:36 2008
@@ -26,6 +26,9 @@
import org.apache.axis2.context.AbstractContext;
import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -36,27 +39,46 @@
private ConcurrentHashMap seqID2csm = new ConcurrentHashMap();
private ConcurrentHashMap intSeqID2csm = new ConcurrentHashMap();
+ private ConcurrentHashMap inUseSeqIDs = new ConcurrentHashMap();
public InMemoryRMSBeanMgr(InMemoryStorageManager mgr, AbstractContext context) {
super(mgr, context, Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
}
+
+ private boolean isSeqIDUsable(String seqID, String createSeqMsgID, boolean isInsert){
+ boolean isUsable = true;
+ if(seqID != null) {
+ Object o = inUseSeqIDs.putIfAbsent(seqID, createSeqMsgID);
+
+ if(isInsert && o!= null){
+ isUsable = false;
+ }
+
+ if(o != null && !o.equals(createSeqMsgID)){
+ isUsable = false;
+ }
+ }
+ return isUsable;
+ }
public boolean insert(RMSBean bean) throws SandeshaStorageException {
boolean res = false;
lock.lock();
- if(intSeqID2csm.get(bean.getInternalSequenceID())==null){
- res = super.insert(bean.getCreateSeqMsgID(), bean);
- if(res){
- if(bean.getInternalSequenceID()!=null){
- intSeqID2csm.put(bean.getInternalSequenceID(), bean.getCreateSeqMsgID());
- }
- if(bean.getSequenceID()!=null){
- seqID2csm.put(bean.getSequenceID(), bean.getCreateSeqMsgID());
+ if(isSeqIDUsable(bean.getSequenceID(), bean.getCreateSeqMsgID(), true)){
+ if(intSeqID2csm.get(bean.getInternalSequenceID())==null){
+ res = super.insert(bean.getCreateSeqMsgID(), bean);
+ if(res){
+ if(bean.getInternalSequenceID()!=null){
+ intSeqID2csm.put(bean.getInternalSequenceID(), bean.getCreateSeqMsgID());
+ }
+ if(bean.getSequenceID()!=null){
+ seqID2csm.put(bean.getSequenceID(), bean.getCreateSeqMsgID());
+ }
}
- }
- }
- lock.unlock();
+ }
+ }
+ lock.unlock();
return res;
}
@@ -65,7 +87,8 @@
RMSBean removed = (RMSBean) super.delete(msgId);
if(removed!=null){
seqID2csm.remove(removed.getSequenceID());
- intSeqID2csm.remove(removed.getInternalSequenceID());
+ intSeqID2csm.remove(removed.getInternalSequenceID());
+ inUseSeqIDs.remove(removed.getSequenceID());
}
return removed!=null;
@@ -76,15 +99,19 @@
}
public boolean update(RMSBean bean) throws SandeshaStorageException {
- boolean result = super.update(bean.getCreateSeqMsgID(), bean);
- if(bean.getInternalSequenceID()!=null){
- intSeqID2csm.put(bean.getInternalSequenceID(), bean.getCreateSeqMsgID());
- }
- if(bean.getSequenceID()!=null){
- seqID2csm.put(bean.getSequenceID(), bean.getCreateSeqMsgID());
- }
- return result;
+ boolean result = false;
+ if(isSeqIDUsable(bean.getSequenceID(), bean.getCreateSeqMsgID(), false)){
+ result = super.update(bean.getCreateSeqMsgID(), bean);
+ if(bean.getInternalSequenceID()!=null){
+ intSeqID2csm.put(bean.getInternalSequenceID(), bean.getCreateSeqMsgID());
+ }
+ if(bean.getSequenceID()!=null){
+ seqID2csm.put(bean.getSequenceID(), bean.getCreateSeqMsgID());
+ }
+ }
+
+ return result;
}
public List find(RMSBean bean) throws SandeshaStorageException {
Modified: webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java (original)
+++ webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java Thu Jul 31 05:38:36 2008
@@ -82,22 +82,18 @@
createSeqBean2.setCreateSeqMsgID("CreateSeqMsgId2");
createSeqBean2.setSequenceID("SeqId1");
- mgr.insert(createSeqBean1);
- mgr.insert(createSeqBean2);
+ assertTrue(mgr.insert(createSeqBean1));
+
+ //This RMSBean won't get added
+ //as we protect against adding two RMSBeans with identical Seq ID's
+ assertFalse(mgr.insert(createSeqBean2));
RMSBean target = new RMSBean();
target.setSequenceID("SeqId1");
Iterator iter = mgr.find(target).iterator();
RMSBean tmp = (RMSBean) iter.next();
- if (tmp.getCreateSeqMsgID().equals("CreateSeqMsgId1")) {
- tmp = (RMSBean) iter.next();
- assertTrue(tmp.getCreateSeqMsgID().equals("CreateSeqMsgId2"));
-
- } else {
- tmp = (RMSBean) iter.next();
- assertTrue(tmp.getCreateSeqMsgID().equals("CreateSeqMsgId1"));
- }
+ assertTrue(tmp.getCreateSeqMsgID().equals("CreateSeqMsgId1"));
}
public void testInsert() throws SandeshaStorageException{
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org