You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/10/11 06:31:52 UTC
svn commit: r462697 [2/3] - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2: handlers/ i18n/
msgprocessors/ storage/beans/ storage/inmemory/ transport/ util/ workers/
wsrm/
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,323 +1,320 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.Iterator;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.RelatesTo;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.NextMsgBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SOAPAbstractFactory;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.wsrm.Accept;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CreateSequenceResponse;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-
-/**
- * Responsible for processing an incoming Create Sequence Response message.
- */
-
-public class CreateSeqResponseMsgProcessor implements MsgProcessor {
-
- private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
-
- public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
-
- ConfigurationContext configCtx = createSeqResponseRMMsgCtx.getMessageContext().getConfigurationContext();
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
- .getAxisConfiguration());
-
- // Processing the create sequence response.
-
- CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
- if (createSeqResponsePart == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqResponse);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String newOutSequenceId = createSeqResponsePart.getIdentifier().getIdentifier();
- if (newOutSequenceId == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.newSeqIdIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext().getRelatesTo();
- if (relatesTo == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
- log.error(message);
- throw new SandeshaException(message);
- }
- String createSeqMsgId = relatesTo.getValue();
-
- SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
- CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
-
- CreateSeqBean createSeqBean = createSeqMgr.retrieve(createSeqMsgId);
- if (createSeqBean == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- // Check that the create sequence response message proves possession of the correct token
- String tokenData = createSeqBean.getSecurityTokenData();
- if(tokenData != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
- MessageContext crtSeqResponseCtx = createSeqResponseRMMsgCtx.getMessageContext();
- OMElement body = crtSeqResponseCtx.getEnvelope().getBody();
- SecurityToken token = secManager.recoverSecurityToken(tokenData);
- secManager.checkProofOfPossession(token, body, crtSeqResponseCtx);
- }
-
- String internalSequenceId = createSeqBean.getInternalSequenceID();
- if (internalSequenceId == null || "".equals(internalSequenceId)) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.tempSeqIdNotSet);
- log.debug(message);
- throw new SandeshaException(message);
- }
- createSeqResponseRMMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
-
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(createSeqResponseRMMsgCtx);
-
- createSeqBean.setSequenceID(newOutSequenceId);
- createSeqMgr.update(createSeqBean);
-
- SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
- if (createSequenceSenderBean == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
-
- // deleting the create sequence entry.
- retransmitterMgr.delete(createSeqMsgId);
-
- // storing new out sequence id
- SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
- SequencePropertyBean outSequenceBean = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, newOutSequenceId);
- SequencePropertyBean internalSequenceBean = new SequencePropertyBean(newOutSequenceId,
- Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, sequencePropertyKey);
-
- sequencePropMgr.insert(outSequenceBean);
- sequencePropMgr.insert(internalSequenceBean);
-
- // Store the security token under the new sequence id
- if(tokenData != null) {
- SequencePropertyBean newToken = new SequencePropertyBean(newOutSequenceId,
- Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
- sequencePropMgr.insert(newToken);
- }
-
- // processing for accept (offer has been sent)
- Accept accept = createSeqResponsePart.getAccept();
- if (accept != null) {
- // Find offered sequence from internal sequence id.
- SequencePropertyBean offeredSequenceBean = sequencePropMgr.retrieve(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
-
- // TODO this should be detected in the Fault manager.
- if (offeredSequenceBean == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String offeredSequenceId = (String) offeredSequenceBean.getValue();
-
- EndpointReference acksToEPR = accept.getAcksTo().getAddress().getEpr();
- SequencePropertyBean acksToBean = new SequencePropertyBean();
- acksToBean.setName(Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
- acksToBean.setSequencePropertyKey(offeredSequenceId);
- acksToBean.setValue(acksToEPR.getAddress());
-
- sequencePropMgr.insert(acksToBean);
-
- NextMsgBean nextMsgBean = new NextMsgBean();
- nextMsgBean.setSequenceID(offeredSequenceId);
- nextMsgBean.setNextMsgNoToProcess(1);
-
-
- boolean pollingMode = false;
- if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
- String replyToAddress = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
- if (replyToAddress!=null) {
- if (AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyToAddress))
- pollingMode = true;
- else if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyToAddress))
- pollingMode = true;
- else if (replyToAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
- pollingMode = true;
- }
- }
-
- //Storing the createSequence of the sending side sequence as the reference message.
- //This can be used when creating new outgoing messages.
-
- String createSequenceMsgStoreKey = createSeqBean.getCreateSequenceMsgStoreKey();
- MessageContext createSequenceMsg = storageManager.retrieveMessageContext(createSequenceMsgStoreKey, configCtx);
-
- String newMessageStoreKey = SandeshaUtil.getUUID();
- storageManager.storeMessageContext(newMessageStoreKey,createSequenceMsg);
-
- nextMsgBean.setReferenceMessageKey(newMessageStoreKey);
-
- nextMsgBean.setPollingMode(pollingMode);
-
- //if PollingMode is true, starting the pollingmanager.
- if (pollingMode)
- SandeshaUtil.startPollingManager(configCtx);
-
- NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
- nextMsgMgr.insert(nextMsgBean);
-
- String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
-
- SequencePropertyBean specVersionBean = new SequencePropertyBean(offeredSequenceId,
- Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION, rmSpecVersion);
- sequencePropMgr.insert(specVersionBean);
-
- SequencePropertyBean receivedMsgBean = new SequencePropertyBean(offeredSequenceId,
- Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
- sequencePropMgr.insert(receivedMsgBean);
-
- SequencePropertyBean msgsBean = new SequencePropertyBean();
- msgsBean.setSequencePropertyKey(offeredSequenceId);
- msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
- msgsBean.setValue("");
- sequencePropMgr.insert(msgsBean);
-
- // setting the addressing version.
- String addressingNamespace = createSeqResponseRMMsgCtx.getAddressingNamespaceValue();
- SequencePropertyBean addressingVersionBean = new SequencePropertyBean(offeredSequenceId,
- Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, addressingNamespace);
- sequencePropMgr.insert(addressingVersionBean);
-
- // Store the security token for the offered sequence
- if(tokenData != null) {
- SequencePropertyBean newToken = new SequencePropertyBean(offeredSequenceId,
- Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
- sequencePropMgr.insert(newToken);
- }
- }
-
- SenderBean target = new SenderBean();
- target.setInternalSequenceID(internalSequenceId);
- target.setSend(false);
- target.setReSend(true);
-
- Iterator iterator = retransmitterMgr.find(target).iterator();
- while (iterator.hasNext()) {
- SenderBean tempBean = (SenderBean) iterator.next();
-
- // updating the application message
- String key = tempBean.getMessageContextRefKey();
- MessageContext applicationMsg = storageManager.retrieveMessageContext(key, configCtx);
-
- // TODO make following exception message more understandable to the
- // user (probably some others exceptions messages as well)
- if (applicationMsg == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
-
- String rmVersion = SandeshaUtil.getRMVersion(sequencePropertyKey, storageManager);
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
- String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
- RMMsgContext applicaionRMMsg = MsgInitializer.initializeMessage(applicationMsg);
-
- Sequence sequencePart = (Sequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (sequencePart == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- try {
- applicaionRMMsg.addSOAPEnvelope();
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
-
- // asking to send the application msssage
- tempBean.setSend(true);
- retransmitterMgr.update(tempBean);
-
- // updating the message. this will correct the SOAP envelope string.
- storageManager.updateMessageContext(key, applicationMsg);
- }
-
- SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
-
- createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
-
- createSeqResponseRMMsgCtx.pause();
-
- if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
- }
-
- public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
- if (log.isDebugEnabled()) {
- log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
- log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
- }
-
- }
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * Responsible for processing an incoming Create Sequence Response message.
+ */
+
+public class CreateSeqResponseMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
+
+ ConfigurationContext configCtx = createSeqResponseRMMsgCtx.getMessageContext().getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+
+ // Processing the create sequence response.
+
+ CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+ if (createSeqResponsePart == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noCreateSeqResponse);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String newOutSequenceId = createSeqResponsePart.getIdentifier().getIdentifier();
+ if (newOutSequenceId == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.newSeqIdIsNull);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext().getRelatesTo();
+ if (relatesTo == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
+ log.error(message);
+ throw new SandeshaException(message);
+ }
+ String createSeqMsgId = relatesTo.getValue();
+
+ SenderBeanMgr retransmitterMgr = storageManager.getRetransmitterBeanMgr();
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+
+ CreateSeqBean createSeqBean = createSeqMgr.retrieve(createSeqMsgId);
+ if (createSeqBean == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ // Check that the create sequence response message proves possession of the correct token
+ String tokenData = createSeqBean.getSecurityTokenData();
+ if(tokenData != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+ MessageContext crtSeqResponseCtx = createSeqResponseRMMsgCtx.getMessageContext();
+ OMElement body = crtSeqResponseCtx.getEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(tokenData);
+ secManager.checkProofOfPossession(token, body, crtSeqResponseCtx);
+ }
+
+ String internalSequenceId = createSeqBean.getInternalSequenceID();
+ if (internalSequenceId == null || "".equals(internalSequenceId)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.tempSeqIdNotSet);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ createSeqResponseRMMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(createSeqResponseRMMsgCtx);
+
+ createSeqBean.setSequenceID(newOutSequenceId);
+ createSeqMgr.update(createSeqBean);
+
+ SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+ if (createSequenceSenderBean == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
+
+ // deleting the create sequence entry.
+ retransmitterMgr.delete(createSeqMsgId);
+
+ // storing new out sequence id
+ SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
+ SequencePropertyBean outSequenceBean = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, newOutSequenceId);
+ SequencePropertyBean internalSequenceBean = new SequencePropertyBean(newOutSequenceId,
+ Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, sequencePropertyKey);
+
+ sequencePropMgr.insert(outSequenceBean);
+ sequencePropMgr.insert(internalSequenceBean);
+
+ // Store the security token under the new sequence id
+ if(tokenData != null) {
+ SequencePropertyBean newToken = new SequencePropertyBean(newOutSequenceId,
+ Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
+ sequencePropMgr.insert(newToken);
+ }
+
+ // processing for accept (offer has been sent)
+ Accept accept = createSeqResponsePart.getAccept();
+ if (accept != null) {
+ // Find offered sequence from internal sequence id.
+ SequencePropertyBean offeredSequenceBean = sequencePropMgr.retrieve(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
+
+ // TODO this should be detected in the Fault manager.
+ if (offeredSequenceBean == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String offeredSequenceId = (String) offeredSequenceBean.getValue();
+
+ EndpointReference acksToEPR = accept.getAcksTo().getAddress().getEpr();
+ SequencePropertyBean acksToBean = new SequencePropertyBean();
+ acksToBean.setName(Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+ acksToBean.setSequencePropertyKey(offeredSequenceId);
+ acksToBean.setValue(acksToEPR.getAddress());
+
+ sequencePropMgr.insert(acksToBean);
+
+ NextMsgBean nextMsgBean = new NextMsgBean();
+ nextMsgBean.setSequenceID(offeredSequenceId);
+ nextMsgBean.setNextMsgNoToProcess(1);
+
+
+ boolean pollingMode = false;
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
+ String replyToAddress = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
+ if (replyToAddress!=null) {
+ if (AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyToAddress))
+ pollingMode = true;
+ else if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyToAddress))
+ pollingMode = true;
+ else if (replyToAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
+ pollingMode = true;
+ }
+ }
+
+ //Storing the referenceMessage of the sending side sequence as the reference message
+ //of the receiving side as well.
+ //This can be used when creating new outgoing messages.
+
+ String referenceMsgStoreKey = createSeqBean.getReferenceMessageStoreKey();
+ MessageContext referenceMsg = storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
+
+ String newMessageStoreKey = SandeshaUtil.getUUID();
+ storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
+
+ nextMsgBean.setReferenceMessageKey(newMessageStoreKey);
+
+ nextMsgBean.setPollingMode(pollingMode);
+
+ //if PollingMode is true, starting the pollingmanager.
+ if (pollingMode)
+ SandeshaUtil.startPollingManager(configCtx);
+
+ NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+ nextMsgMgr.insert(nextMsgBean);
+
+ String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+
+ SequencePropertyBean specVersionBean = new SequencePropertyBean(offeredSequenceId,
+ Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION, rmSpecVersion);
+ sequencePropMgr.insert(specVersionBean);
+
+ SequencePropertyBean receivedMsgBean = new SequencePropertyBean(offeredSequenceId,
+ Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+ sequencePropMgr.insert(receivedMsgBean);
+
+ SequencePropertyBean msgsBean = new SequencePropertyBean();
+ msgsBean.setSequencePropertyKey(offeredSequenceId);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+ msgsBean.setValue("");
+ sequencePropMgr.insert(msgsBean);
+
+ // setting the addressing version.
+ String addressingNamespace = createSeqResponseRMMsgCtx.getAddressingNamespaceValue();
+ SequencePropertyBean addressingVersionBean = new SequencePropertyBean(offeredSequenceId,
+ Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE, addressingNamespace);
+ sequencePropMgr.insert(addressingVersionBean);
+
+ // Store the security token for the offered sequence
+ if(tokenData != null) {
+ SequencePropertyBean newToken = new SequencePropertyBean(offeredSequenceId,
+ Sandesha2Constants.SequenceProperties.SECURITY_TOKEN, tokenData);
+ sequencePropMgr.insert(newToken);
+ }
+ }
+
+ SenderBean target = new SenderBean();
+ target.setInternalSequenceID(internalSequenceId);
+ target.setSend(false);
+ target.setReSend(true);
+
+ Iterator iterator = retransmitterMgr.find(target).iterator();
+ while (iterator.hasNext()) {
+ SenderBean tempBean = (SenderBean) iterator.next();
+
+ // updating the application message
+ String key = tempBean.getMessageContextRefKey();
+ MessageContext applicationMsg = storageManager.retrieveMessageContext(key, configCtx);
+
+ // TODO make following exception message more understandable to the
+ // user (probably some others exceptions messages as well)
+ if (applicationMsg == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
+
+ String rmVersion = SandeshaUtil.getRMVersion(sequencePropertyKey, storageManager);
+ if (rmVersion == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+ String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+
+ RMMsgContext applicaionRMMsg = MsgInitializer.initializeMessage(applicationMsg);
+
+ Sequence sequencePart = (Sequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequencePart == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ Identifier identifier = new Identifier(assumedRMNamespace);
+ identifier.setIndentifer(newOutSequenceId);
+
+ sequencePart.setIdentifier(identifier);
+
+ try {
+ applicaionRMMsg.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage(), e);
+ }
+
+ // asking to send the application msssage
+ tempBean.setSend(true);
+ retransmitterMgr.update(tempBean);
+
+ // updating the message. this will correct the SOAP envelope string.
+ storageManager.updateMessageContext(key, applicationMsg);
+ }
+
+ SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
+
+ createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+
+ createSeqResponseRMMsgCtx.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
+ }
+
+ }
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Tue Oct 10 21:31:51 2006
@@ -17,11 +17,13 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.workers.SenderWorker;
import org.apache.sandesha2.wsrm.Address;
import org.apache.sandesha2.wsrm.Identifier;
import org.apache.sandesha2.wsrm.MakeConnection;
+import org.apache.sandesha2.wsrm.MessagePending;
/**
* This class is responsible for processing MakeConnection request messages that come to the system.
@@ -58,16 +60,31 @@
findSenderBean.setSequenceID(identifier.getIdentifier());
//finding the beans that go with the criteria of the passed SenderBean
+
+ //beans with reSend=true
+ findSenderBean.setReSend(true);
Collection collection = senderBeanMgr.find(findSenderBean);
+ //beans with reSend=false
+ findSenderBean.setReSend (false);
+ Collection collection2 = senderBeanMgr.find(findSenderBean);
+
+ //all possible beans
+ collection.addAll(collection2);
+
//selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
int size = collection.size();
int itemToPick=-1;
+ boolean pending = false;
if (size>0) {
Random random = new Random ();
itemToPick = random.nextInt(size);
}
+
+ if (size>1)
+ pending = true; //there are more than one message to be delivered using the makeConnection.
+ //So the MessagePending header should have value true;
Iterator it = collection.iterator();
@@ -91,8 +108,10 @@
String messageStorageKey = senderBean.getMessageContextRefKey();
MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,configurationContext);
+ RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
- addMessagePendingHeader ();
+
+ addMessagePendingHeader (returnRMMsg,pending);
setTransportProperties (returnMessage, rmMsgCtx);
@@ -111,7 +130,13 @@
worker.run();
}
- private void addMessagePendingHeader (){
+ private void addMessagePendingHeader (RMMsgContext returnMessage, boolean pending) throws SandeshaException {
+ String rmNamespace = returnMessage.getRMNamespaceValue();
+ MessagePending messagePending = new MessagePending (rmNamespace);
+ messagePending.setPending(pending);
+
+ messagePending.toSOAPEnvelope(returnMessage.getSOAPEnvelope());
+
}
public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,31 +1,30 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import org.apache.axis2.AxisFault;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.SandeshaException;
-
-/**
- * The message processor interface.
- */
-
-public interface MsgProcessor {
- public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.sandesha2.RMMsgContext;
+
+/**
+ * The message processor interface.
+ */
+
+public interface MsgProcessor {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,472 +1,470 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.MessageContextConstants;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
-import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.OutInAxisOperation;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.util.Utils;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-import org.apache.sandesha2.wsrm.TerminateSequence;
-
-/**
- * Responsible for processing an incoming Terminate Sequence message.
- */
-
-public class TerminateSeqMsgProcessor implements MsgProcessor {
-
- private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
-
- public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
-
- MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
-
- // Processing the terminate message
- // TODO Add terminate sequence message logic.
- TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg
- .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- if (terminateSequence == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String sequenceId = terminateSequence.getIdentifier().getIdentifier();
- if (sequenceId == null || "".equals(sequenceId)) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateSeqRMMsg);
-
- ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
- SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
-
- // Check that the sender of this TerminateSequence holds the correct token
- SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
- if(tokenBean != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
- OMElement body = terminateSeqRMMsg.getSOAPEnvelope().getBody();
- SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
- secManager.checkProofOfPossession(token, body, terminateSeqRMMsg.getMessageContext());
- }
-
- FaultManager faultManager = new FaultManager();
- SandeshaException fault = faultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId,
- storageManager);
- if (fault != null) {
- throw fault;
- }
-
-
- SequencePropertyBean terminateReceivedBean = new SequencePropertyBean();
- terminateReceivedBean.setSequencePropertyKey(sequencePropertyKey);
- terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
- terminateReceivedBean.setValue("true");
-
- sequencePropertyBeanMgr.insert(terminateReceivedBean);
-
- // add the terminate sequence response if required.
- RMMsgContext terminateSequenceResponse = null;
- if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
- terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, sequencePropertyKey, sequenceId, storageManager);
-
- setUpHighestMsgNumbers(context, storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
-
- TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequencePropertyKey, sequenceId, storageManager);
-
- SequencePropertyBean terminatedBean = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED, Sandesha2Constants.VALUE_TRUE);
-
- sequencePropertyBeanMgr.insert(terminatedBean);
-
- // removing an entry from the listener
- String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
-
- SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
-
- //sending the terminate sequence response
- if (terminateSequenceResponse != null) {
-
- MessageContext outMessage = terminateSequenceResponse.getMessageContext();
- EndpointReference toEPR = outMessage.getTo();
-
- AxisEngine engine = new AxisEngine(terminateSeqMsg
- .getConfigurationContext());
- engine.send(outMessage);
-
- String addressingNamespaceURI = SandeshaUtil
- .getSequenceProperty(
- sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
- storageManager);
-
- String anonymousURI = SpecSpecificConstants
- .getAddressingAnonymousURI(addressingNamespaceURI);
-
- if (anonymousURI.equals(toEPR.getAddress())) {
- terminateSeqMsg.getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
- } else {
- terminateSeqMsg.getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
- }
-
- }
-
- terminateSeqMsg.pause();
-
- if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
- }
-
- private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
- String requestSidesequencePropertyKey, String sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
-
- if (log.isDebugEnabled())
- log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- String highestImMsgNumberStr = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
- String highestImMsgKey = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
- Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
-
- long highestInMsgNo = 0;
- if (highestImMsgNumberStr != null) {
- if (highestImMsgKey == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.highestMsgKeyNotStored, sequenceId));
-
- highestInMsgNo = Long.parseLong(highestImMsgNumberStr);
- }
-
- // following will be valid only for the server side, since the obtained
- // int. seq ID is only valid there.
- String responseSideInternalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
-
- //sequencePropertyKey is equal to the internalSequenceId for the outgoing sequence.
- String responseSideSequencePropertyKey = responseSideInternalSequenceId;
-
- long highestOutMsgNo = 0;
- try {
- boolean addResponseSideTerminate = false;
- if (highestInMsgNo == 0) {
- addResponseSideTerminate = false;
- } else {
-
- // setting the last in message property
- SequencePropertyBean lastInMsgBean = new SequencePropertyBean(requestSidesequencePropertyKey,
- Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO, highestImMsgNumberStr);
- seqPropMgr.insert(lastInMsgBean);
-
- MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey, configCtx);
-
- // TODO get the out message in a storage friendly manner.
- MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_FAULT_VALUE);
-
- if (highestOutMessage == null || highestOutMessage.getEnvelope() == null)
- highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
-
- if (highestOutMessage != null) {
- if (highestOutMessage.getEnvelope() == null)
- throw new SandeshaException(SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.outMsgHasNoEnvelope));
-
- RMMsgContext highestOutRMMsg = MsgInitializer.initializeMessage(highestOutMessage);
- Sequence seqPartOfOutMsg = (Sequence) highestOutRMMsg
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
- if (seqPartOfOutMsg != null) {
-
- // response message of the last in message can be
- // considered as the last out message.
- highestOutMsgNo = seqPartOfOutMsg.getMessageNumber().getMessageNumber();
- SequencePropertyBean highestOutMsgBean = new SequencePropertyBean(
- responseSideSequencePropertyKey ,
- Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new Long(highestOutMsgNo)
- .toString());
-
- seqPropMgr.insert(highestOutMsgBean);
- addResponseSideTerminate = true;
- }
- }
- }
-
- // If all the out message have been acked, add the outgoing
- // terminate seq msg.
- String outgoingSqunceID = SandeshaUtil.getSequenceProperty(responseSideSequencePropertyKey,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
- if (addResponseSideTerminate && highestOutMsgNo > 0 && responseSideSequencePropertyKey != null
- && outgoingSqunceID != null) {
- boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto(highestOutMsgNo, responseSideSequencePropertyKey,
- storageManager);
-
- if (allAcked)
- TerminateManager.addTerminateSequenceMessage(terminateRMMsg, outgoingSqunceID,
- responseSideSequencePropertyKey, storageManager);
- }
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
- if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
- }
-
- private RMMsgContext getTerminateSequenceResponse(RMMsgContext terminateSeqRMMsg, String sequencePropertyKey,String sequenceId,
- StorageManager storageManager) throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
-
- MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
-
- MessageContext outMessage = null;
-
- try {
- outMessage = Utils.createOutMessageContext(terminateSeqMsg);
- } catch (AxisFault e1) {
- throw new SandeshaException(e1);
- }
-
- RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg,
- outMessage, storageManager);
-
- RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, sequencePropertyKey,
- sequenceId, storageManager);
-
- Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-
- if (iter.hasNext()) {
- SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next();
- if (seqAck==null) {
- String message = "No SequenceAcknowledgement part is present";
- throw new SandeshaException (message);
- }
-
- terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck);
- } else {
- //TODO
- }
-
- terminateSeqResponseRMMsg.addSOAPEnvelope();
-
- terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
- terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- outMessage.setResponseWritten(true);
-
- if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
-
- return terminateSeqResponseRMMsg;
-
-
- }
-
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
-
- if (log.isDebugEnabled())
- log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
-
- MessageContext msgContext = rmMsgCtx.getMessageContext();
- ConfigurationContext configurationContext = msgContext.getConfigurationContext();
- Options options = msgContext.getOptions();
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
- configurationContext.getAxisConfiguration());
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-
- String toAddress = rmMsgCtx.getTo().getAddress();
- String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
- String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
- if (outSequenceID == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSeqenceID));
-
- // / Transaction addTerminateSeqTransaction =
- // storageManager.getTransaction();
-
- String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
- Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
-
- // registring an InOutOperationContext for this.
- // since the serviceContext.fireAndForget only sets a inOnly One
- // this does not work when there is a terminateSequnceResponse
- // TODO do processing of terminateMessagesCorrectly., create a new
- // message instead of sendign the one given by the serviceClient
- // TODO important
-
- AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp"));
-
- AxisOperation referenceInOutOperation = msgContext.getAxisService()
- .getOperation(
- new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME));
- if (referenceInOutOperation == null) {
- String messge = "Cant find the recerence RM InOut operation";
- throw new SandeshaException(messge);
- }
-
- outInAxisOp.setParent(msgContext.getAxisService());
- // setting flows
- // outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow());
- outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation
- .getRemainingPhasesInFlow());
-
- OperationContext opcontext = OperationContextFactory
- .createOperationContext(
- WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp);
- opcontext.setParent(msgContext.getServiceContext());
- configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),
- opcontext);
-
- msgContext.setOperationContext(opcontext);
- msgContext.setAxisOperation(outInAxisOp);
-
- if (terminated != null && "true".equals(terminated)) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
- log.debug(message);
- return;
- }
-
- TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
-
- rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
- msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- rmMsgCtx.setTo(new EndpointReference(toAddress));
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID, storageManager);
- if (rmVersion == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
- rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
- rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
- String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID,
- Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
- if (transportTo != null) {
- rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL, transportTo);
- }
-
- try {
- rmMsgCtx.addSOAPEnvelope();
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean terminateBean = new SenderBean();
- terminateBean.setMessageContextRefKey(key);
-
- storageManager.storeMessageContext(key, msgContext);
-
- // Set a retransmitter lastSentTime so that terminate will be send with
- // some delay.
- // Otherwise this get send before return of the current request (ack).
- // TODO: refine the terminate delay.
- terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
-
- terminateBean.setMessageID(msgContext.getMessageID());
-
- EndpointReference to = msgContext.getTo();
- if (to!=null)
- terminateBean.setToAddress(to.getAddress());
-
- // this will be set to true at the sender.
- terminateBean.setSend(true);
-
- msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- terminateBean.setReSend(false);
-
- SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
-
- retramsmitterMgr.insert(terminateBean);
-
- SequencePropertyBean terminateAdded = new SequencePropertyBean();
- terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- terminateAdded.setSequencePropertyKey(outSequenceID);
- terminateAdded.setValue("true");
-
- seqPropMgr.insert(terminateAdded);
-
- rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
-
- SandeshaUtil.executeAndStore(rmMsgCtx, key);
-
- if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
- }
-
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.OutInAxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * Responsible for processing an incoming Terminate Sequence message.
+ */
+
+public class TerminateSeqMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
+
+ MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+
+ // Processing the terminate message
+ // TODO Add terminate sequence message logic.
+ TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ if (terminateSequence == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+ if (sequenceId == null || "".equals(sequenceId)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateSeqRMMsg);
+
+ ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
+
+ // Check that the sender of this TerminateSequence holds the correct token
+ SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ if(tokenBean != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+ OMElement body = terminateSeqRMMsg.getSOAPEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+ secManager.checkProofOfPossession(token, body, terminateSeqRMMsg.getMessageContext());
+ }
+
+ FaultManager faultManager = new FaultManager();
+ SandeshaException fault = faultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId,
+ storageManager);
+ if (fault != null) {
+ throw fault;
+ }
+
+
+ SequencePropertyBean terminateReceivedBean = new SequencePropertyBean();
+ terminateReceivedBean.setSequencePropertyKey(sequencePropertyKey);
+ terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
+ terminateReceivedBean.setValue("true");
+
+ sequencePropertyBeanMgr.insert(terminateReceivedBean);
+
+ // add the terminate sequence response if required.
+ RMMsgContext terminateSequenceResponse = null;
+ if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
+ terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, sequencePropertyKey, sequenceId, storageManager);
+
+ setUpHighestMsgNumbers(context, storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
+
+ TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequencePropertyKey, sequenceId, storageManager);
+
+ SequencePropertyBean terminatedBean = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED, Sandesha2Constants.VALUE_TRUE);
+
+ sequencePropertyBeanMgr.insert(terminatedBean);
+
+ SequenceManager.updateLastActivatedTime(sequencePropertyKey, storageManager);
+
+ //sending the terminate sequence response
+ if (terminateSequenceResponse != null) {
+
+ MessageContext outMessage = terminateSequenceResponse.getMessageContext();
+ EndpointReference toEPR = outMessage.getTo();
+
+ AxisEngine engine = new AxisEngine(terminateSeqMsg
+ .getConfigurationContext());
+
+
+ outMessage.setServerSide(false);
+ engine.send(outMessage);
+
+ String addressingNamespaceURI = SandeshaUtil
+ .getSequenceProperty(
+ sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
+ storageManager);
+
+ String anonymousURI = SpecSpecificConstants
+ .getAddressingAnonymousURI(addressingNamespaceURI);
+
+ if (anonymousURI.equals(toEPR.getAddress())) {
+ terminateSeqMsg.getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+ } else {
+ terminateSeqMsg.getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ }
+
+ }
+
+ terminateSeqMsg.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
+ }
+
+ private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
+ String requestSidesequencePropertyKey, String sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+ String highestImMsgNumberStr = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER, storageManager);
+ String highestImMsgKey = SandeshaUtil.getSequenceProperty(requestSidesequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY, storageManager);
+
+ long highestInMsgNo = 0;
+ if (highestImMsgNumberStr != null) {
+ if (highestImMsgKey == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.highestMsgKeyNotStored, sequenceId));
+
+ highestInMsgNo = Long.parseLong(highestImMsgNumberStr);
+ }
+
+ // following will be valid only for the server side, since the obtained
+ // int. seq ID is only valid there.
+ String responseSideInternalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
+
+ //sequencePropertyKey is equal to the internalSequenceId for the outgoing sequence.
+ String responseSideSequencePropertyKey = responseSideInternalSequenceId;
+
+ long highestOutMsgNo = 0;
+ try {
+ boolean addResponseSideTerminate = false;
+ if (highestInMsgNo == 0) {
+ addResponseSideTerminate = false;
+ } else {
+
+ // setting the last in message property
+ SequencePropertyBean lastInMsgBean = new SequencePropertyBean(requestSidesequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO, highestImMsgNumberStr);
+ seqPropMgr.insert(lastInMsgBean);
+
+ MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey, configCtx);
+
+ // TODO get the out message in a storage friendly manner.
+ MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_FAULT_VALUE);
+
+ if (highestOutMessage == null || highestOutMessage.getEnvelope() == null)
+ highestOutMessage = highestInMsg.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
+
+ if (highestOutMessage != null) {
+ if (highestOutMessage.getEnvelope() == null)
+ throw new SandeshaException(SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.outMsgHasNoEnvelope));
+
+ RMMsgContext highestOutRMMsg = MsgInitializer.initializeMessage(highestOutMessage);
+ Sequence seqPartOfOutMsg = (Sequence) highestOutRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+
+ if (seqPartOfOutMsg != null) {
+
+ // response message of the last in message can be
+ // considered as the last out message.
+ highestOutMsgNo = seqPartOfOutMsg.getMessageNumber().getMessageNumber();
+ SequencePropertyBean highestOutMsgBean = new SequencePropertyBean(
+ responseSideSequencePropertyKey ,
+ Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO, new Long(highestOutMsgNo)
+ .toString());
+
+ seqPropMgr.insert(highestOutMsgBean);
+ addResponseSideTerminate = true;
+ }
+ }
+ }
+
+ // If all the out message have been acked, add the outgoing
+ // terminate seq msg.
+ String outgoingSqunceID = SandeshaUtil.getSequenceProperty(responseSideSequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
+ if (addResponseSideTerminate && highestOutMsgNo > 0 && responseSideSequencePropertyKey != null
+ && outgoingSqunceID != null) {
+ boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto(highestOutMsgNo, responseSideSequencePropertyKey,
+ storageManager);
+
+ if (allAcked)
+ TerminateManager.addTerminateSequenceMessage(terminateRMMsg, outgoingSqunceID,
+ responseSideSequencePropertyKey, storageManager);
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException(e);
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
+ }
+
+ private RMMsgContext getTerminateSequenceResponse(RMMsgContext terminateSeqRMMsg, String sequencePropertyKey,String sequenceId,
+ StorageManager storageManager) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
+
+ MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+
+ MessageContext outMessage = null;
+
+ try {
+ outMessage = Utils.createOutMessageContext(terminateSeqMsg);
+ } catch (AxisFault e1) {
+ throw new SandeshaException(e1);
+ }
+
+ RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg,
+ outMessage, storageManager);
+
+ RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, sequencePropertyKey,
+ sequenceId, storageManager);
+
+ Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+ if (iter.hasNext()) {
+ SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next();
+ if (seqAck==null) {
+ String message = "No SequenceAcknowledgement part is present";
+ throw new SandeshaException (message);
+ }
+
+ terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck);
+ } else {
+ //TODO
+ }
+
+ terminateSeqResponseRMMsg.addSOAPEnvelope();
+
+ terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+ terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ outMessage.setResponseWritten(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
+
+ return terminateSeqResponseRMMsg;
+
+
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+ Options options = msgContext.getOptions();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+ configurationContext.getAxisConfiguration());
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
+
+ String toAddress = rmMsgCtx.getTo().getAddress();
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+
+ String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID,
+ Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID, storageManager);
+ if (outSequenceID == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSeqenceID));
+
+ // / Transaction addTerminateSeqTransaction =
+ // storageManager.getTransaction();
+
+ String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
+ Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
+
+ // registring an InOutOperationContext for this.
+ // since the serviceContext.fireAndForget only sets a inOnly One
+ // this does not work when there is a terminateSequnceResponse
+ // TODO do processing of terminateMessagesCorrectly., create a new
+ // message instead of sendign the one given by the serviceClient
+ // TODO important
+
+ AxisOperation outInAxisOp = new OutInAxisOperation(new QName("temp"));
+
+ AxisOperation referenceInOutOperation = msgContext.getAxisService()
+ .getOperation(
+ new QName(Sandesha2Constants.RM_IN_OUT_OPERATION_NAME));
+ if (referenceInOutOperation == null) {
+ String messge = "Cant find the recerence RM InOut operation";
+ throw new SandeshaException(messge);
+ }
+
+ outInAxisOp.setParent(msgContext.getAxisService());
+ // setting flows
+ // outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation.getRemainingPhasesInFlow());
+ outInAxisOp.setRemainingPhasesInFlow(referenceInOutOperation
+ .getRemainingPhasesInFlow());
+
+ OperationContext opcontext = OperationContextFactory
+ .createOperationContext(
+ WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, outInAxisOp);
+ opcontext.setParent(msgContext.getServiceContext());
+ configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),
+ opcontext);
+
+ msgContext.setOperationContext(opcontext);
+ msgContext.setAxisOperation(outInAxisOp);
+
+ if (terminated != null && "true".equals(terminated)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
+ log.debug(message);
+ return;
+ }
+
+ TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
+
+ rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID, storageManager);
+ if (rmVersion == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID,
+ Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
+ if (transportTo != null) {
+ rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL, transportTo);
+ }
+
+ try {
+ rmMsgCtx.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage(),e);
+ }
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setMessageContextRefKey(key);
+
+ storageManager.storeMessageContext(key, msgContext);
+
+ // Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ // Otherwise this get send before return of the current request (ack).
+ // TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis() + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(msgContext.getMessageID());
+
+ EndpointReference to = msgContext.getTo();
+ if (to!=null)
+ terminateBean.setToAddress(to.getAddress());
+
+ // this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager.getRetransmitterBeanMgr();
+
+ retramsmitterMgr.insert(terminateBean);
+
+ SequencePropertyBean terminateAdded = new SequencePropertyBean();
+ terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ terminateAdded.setSequencePropertyKey(outSequenceID);
+ terminateAdded.setValue("true");
+
+ seqPropMgr.insert(terminateAdded);
+
+ rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+
+ SandeshaUtil.executeAndStore(rmMsgCtx, key);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
+ }
+
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Tue Oct 10 21:31:51 2006
@@ -1,90 +1,89 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.TerminateSequence;
-import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
-
-/**
- * To process terminate sequence response messages.
- */
-public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
-
- private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
-
- public void processInMessage(RMMsgContext terminateResRMMsg)
- throws AxisFault {
- if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
-
- MessageContext msgContext = terminateResRMMsg.getMessageContext();
- ConfigurationContext context = terminateResRMMsg.getConfigurationContext();
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
- SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
-
- TerminateSequenceResponse tsResponse = (TerminateSequenceResponse)
- terminateResRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
-
- String sequenceId = tsResponse.getIdentifier().getIdentifier();
- String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceId,
- Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
- msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateResRMMsg);
-
- // Check that the sender of this TerminateSequence holds the correct token
- SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
- if(tokenBean != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
- OMElement body = terminateResRMMsg.getSOAPEnvelope().getBody();
- SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
- secManager.checkProofOfPossession(token, body, msgContext);
- }
-
- ConfigurationContext configContext = msgContext.getConfigurationContext();
-
-
- TerminateManager.terminateSendingSide (configContext, sequencePropertyKey,internalSequenceID, msgContext.isServerSide(),
- storageManager);
-
- // Stop this message travelling further through the Axis runtime
- terminateResRMMsg.pause();
-
- if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage");
- }
-
- public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
- if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
- if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage");
- }
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
+
+/**
+ * To process terminate sequence response messages.
+ */
+public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext terminateResRMMsg)
+ throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
+
+ MessageContext msgContext = terminateResRMMsg.getMessageContext();
+ ConfigurationContext context = terminateResRMMsg.getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropertyBeanMgr();
+
+ TerminateSequenceResponse tsResponse = (TerminateSequenceResponse)
+ terminateResRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
+
+ String sequenceId = tsResponse.getIdentifier().getIdentifier();
+ String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceId,
+ Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
+ msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(terminateResRMMsg);
+
+ // Check that the sender of this TerminateSequence holds the correct token
+ SequencePropertyBean tokenBean = sequencePropertyBeanMgr.retrieve(sequencePropertyKey, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ if(tokenBean != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(context);
+ OMElement body = terminateResRMMsg.getSOAPEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+ secManager.checkProofOfPossession(token, body, msgContext);
+ }
+
+ ConfigurationContext configContext = msgContext.getConfigurationContext();
+
+
+ TerminateManager.terminateSendingSide (configContext, sequencePropertyKey,internalSequenceID, msgContext.isServerSide(),
+ storageManager);
+
+ // Stop this message travelling further through the Axis runtime
+ terminateResRMMsg.pause();
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage");
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage");
+ }
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/CreateSeqBean.java Tue Oct 10 21:31:51 2006
@@ -62,6 +62,13 @@
*/
private String createSequenceMsgStoreKey;
+ /**
+ * This is stored here, so that the message pointed by this can be used as a reference when Sandesha
+ * want the generate new messages. (e.g. MakeConnection). Create sequence message could not be used
+ * here since it may be subjected to things like encryption.
+ */
+ private String referenceMessageStoreKey;
+
public CreateSeqBean() {
}
@@ -104,6 +111,15 @@
public void setCreateSequenceMsgStoreKey(String createSequenceMsgStoreKey) {
this.createSequenceMsgStoreKey = createSequenceMsgStoreKey;
+ }
+
+
+ public String getReferenceMessageStoreKey() {
+ return referenceMessageStoreKey;
+ }
+
+ public void setReferenceMessageStoreKey(String referenceMessageStoreKey) {
+ this.referenceMessageStoreKey = referenceMessageStoreKey;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=462697&r1=462696&r2=462697
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Tue Oct 10 21:31:51 2006
@@ -123,8 +123,8 @@
if (bean.isSend() != temp.isSend())
add = false;
-// if (bean.isReSend() != temp.isReSend())
-// add = false;
+ if (bean.isReSend() != temp.isReSend())
+ add = false;
if (add)
beans.add(temp);
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org