You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2007/02/01 14:53:38 UTC
svn commit: r502213 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2:
msgprocessors/ApplicationMsgProcessor.java
msgprocessors/CreateSeqResponseMsgProcessor.java
storage/beans/SenderBean.java util/RMMsgCreator.java
workers/SenderWorker.java
Author: mlovett
Date: Thu Feb 1 05:53:37 2007
New Revision: 502213
URL: http://svn.apache.org/viewvc?view=rev&rev=502213
Log:
Avoid loading and saving message contexts when we get a create sequence response
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Feb 1 05:53:37 2007
@@ -53,11 +53,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.CreateSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceOffer;
@@ -592,16 +588,8 @@
else
rmMsg.setTo(toEPR);
- String rmVersion = rmsBean.getRMVersion();
-
- String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
- Sequence sequence = new Sequence(rmNamespaceValue);
- MessageNumber msgNumber = new MessageNumber(rmNamespaceValue);
- msgNumber.setMessageNumber(messageNumber);
- sequence.setMessageNumber(msgNumber);
-
// setting last message
+ boolean lastMessage = false;
if (msg.isServerSide()) {
MessageContext requestMsg = null;
@@ -617,7 +605,7 @@
}
if (requestSequence.getLastMessage() != null) {
- sequence.setLastMessage(new LastMessage(rmNamespaceValue));
+ lastMessage = true;
}
} else {
@@ -627,51 +615,15 @@
if (operationContext != null) {
Object obj = msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
if (obj != null && "true".equals(obj)) {
-
- if (SpecSpecificConstants.isLastMessageIndicatorRequired(rmVersion))
- sequence.setLastMessage(new LastMessage(rmNamespaceValue));
+ lastMessage = true;
}
}
}
- AckRequested ackRequested = null;
-
- boolean addAckRequested = false;
- // if (!lastMessage)
- // addAckRequested = true; //TODO decide the policy to add the
- // ackRequested tag
-
- // setting the Sequence id.
- // Set send = true/false depending on the availability of the out
- // sequence id.
- String identifierStr = null;
- if (outSequenceID == null) {
- identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
- } else {
- identifierStr = outSequenceID;
- }
-
- Identifier id1 = new Identifier(rmNamespaceValue);
- id1.setIndentifer(identifierStr);
- sequence.setIdentifier(id1);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
-
- if (addAckRequested) {
- ackRequested = new AckRequested(rmNamespaceValue);
- Identifier id2 = new Identifier(rmNamespaceValue);
- id2.setIndentifer(identifierStr);
- ackRequested.setIdentifier(id2);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST, ackRequested);
- }
-
- // Now that we have added the headers to the message, make sure that we secure it with
- // the correct token.
+ // Now that we have decided which sequence to use for the message, make sure that we secure
+ // it with the correct token.
RMMsgCreator.secureOutboundMessage(rmsBean, msg);
- rmMsg.addSOAPEnvelope();
-
-
// Retransmitter bean entry for the application message
SenderBean appMsgEntry = new SenderBean();
@@ -680,6 +632,7 @@
appMsgEntry.setTimeToSend(System.currentTimeMillis());
appMsgEntry.setMessageID(rmMsg.getMessageId());
appMsgEntry.setMessageNumber(messageNumber);
+ appMsgEntry.setLastMessage(lastMessage);
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
if (outSequenceID == null) {
appMsgEntry.setSend(false);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Feb 1 05:53:37 2007
@@ -44,17 +44,10 @@
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.wsrm.Accept;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* Responsible for processing an incoming Create Sequence Response message.
@@ -226,105 +219,20 @@
rmsBean.setLastActivatedTime(System.currentTimeMillis());
rmsBeanMgr.update(rmsBean);
+ // Locate and update all of the messages for this sequence, now that we know
+ // the sequence id.
SenderBean target = new SenderBean();
target.setInternalSequenceID(internalSequenceId);
target.setSend(false);
-
+
Iterator iterator = retransmitterMgr.find(target).iterator();
while (iterator.hasNext()) {
SenderBean tempBean = (SenderBean) iterator.next();
- // updating the application message
- String key = tempBean.getMessageContextRefKey();
- MessageContext applicationMsg = storageManager.retrieveMessageContext(key, configCtx);
-
- // TODO make following exception message more understandable to the
- // user (probably some others exceptions messages as well)
- if (applicationMsg == null)
- throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
-
- String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion());
-
- RMMsgContext applicaionRMMsg = MsgInitializer.initializeMessage(applicationMsg);
-
- if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
-
- Sequence sequencePart = (Sequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (sequencePart == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- } else if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-
- TerminateSequence sequencePart = (TerminateSequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- if (sequencePart == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- } else if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
-
- CloseSequence sequencePart = (CloseSequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
- if (sequencePart == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- } else if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.ACK_REQUEST) {
-
- Iterator headerIterator = applicaionRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
-
- AckRequested sequencePart = null;
- while (headerIterator.hasNext()) {
- sequencePart = (AckRequested) headerIterator.next();
- }
-
- if (headerIterator.hasNext()) {
- throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
- }
-
- if (sequencePart == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- sequencePart.getIdentifier().setIndentifer(newOutSequenceId);
-
- }
-
- try {
- applicaionRMMsg.addSOAPEnvelope();
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage(), e);
- }
-
// asking to send the application msssage
tempBean.setSend(true);
tempBean.setSequenceID(newOutSequenceId);
retransmitterMgr.update(tempBean);
-
- // updating the message. this will correct the SOAP envelope string.
- storageManager.updateMessageContext(key, applicationMsg);
}
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java Thu Feb 1 05:53:37 2007
@@ -96,6 +96,11 @@
private int messageType =0;
/**
+ * Flag to indicate if this is the last message for the sequence
+ */
+ private boolean lastMessage = false;
+
+ /**
* Flags that are used to check if the primitive types on this bean
* have been set. If a primitive type has not been set then it will
* be ignored within the match method.
@@ -107,6 +112,7 @@
private static final int RESEND_FLAG = 0x00001000;
private static final int TIME_TO_SEND_FLAG = 0x00010000;
private static final int MSG_TYPE_FLAG = 0x00100000;
+ private static final int LAST_MSG_FLAG = 0x01000000;
public SenderBean() {
@@ -216,6 +222,15 @@
public void setToAddress(String toAddress) {
this.toAddress = toAddress;
}
+
+ public boolean isLastMessage() {
+ return lastMessage;
+ }
+
+ public void setLastMessage(boolean lastMessage) {
+ this.lastMessage = lastMessage;
+ this.flags |= LAST_MSG_FLAG;
+ }
public String toString() {
StringBuffer result = new StringBuffer();
@@ -271,6 +286,10 @@
else if((bean.flags & MSG_TYPE_FLAG) != 0 && bean.getMessageType() != this.getMessageType())
match = false;
+ else if((bean.flags & LAST_MSG_FLAG) != 0 && bean.isLastMessage() != this.isLastMessage())
+ match = false;
+
return match;
}
+
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java Thu Feb 1 05:53:37 2007
@@ -91,27 +91,21 @@
if (context == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
- MessageContext createSeqmsgContext;
- try {
- // creating by copying common contents. (this will not set contexts
- // except for configCtx).
- AxisOperation createSequenceOperation = SpecSpecificConstants.getWSRMOperation(
- Sandesha2Constants.MessageTypes.CREATE_SEQ,
- rmsBean.getRMVersion(),
- applicationMsgContext.getAxisService());
+ // creating by copying common contents. (this will not set contexts
+ // except for configCtx).
+ AxisOperation createSequenceOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.CREATE_SEQ,
+ rmsBean.getRMVersion(),
+ applicationMsgContext.getAxisService());
- createSeqmsgContext = SandeshaUtil
- .createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
-
- OperationContext createSeqOpCtx = createSeqmsgContext.getOperationContext();
- String createSeqMsgId = SandeshaUtil.getUUID();
- createSeqmsgContext.setMessageID(createSeqMsgId);
- context.registerOperationContext(createSeqMsgId, createSeqOpCtx);
+ MessageContext createSeqmsgContext = SandeshaUtil
+ .createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
+
+ OperationContext createSeqOpCtx = createSeqmsgContext.getOperationContext();
+ String createSeqMsgId = SandeshaUtil.getUUID();
+ createSeqmsgContext.setMessageID(createSeqMsgId);
+ context.registerOperationContext(createSeqMsgId, createSeqOpCtx);
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage(), e);
- }
-
RMMsgContext createSeqRMMsg = new RMMsgContext(createSeqmsgContext);
String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion());
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Thu Feb 1 05:53:37 2007
@@ -1,6 +1,7 @@
package org.apache.sandesha2.workers;
import java.util.ArrayList;
+import java.util.Iterator;
import javax.xml.namespace.QName;
@@ -30,6 +31,7 @@
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
@@ -37,6 +39,12 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.LastMessage;
+import org.apache.sandesha2.wsrm.MessageNumber;
+import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.TerminateSequence;
public class SenderWorker extends SandeshaWorker implements Runnable {
@@ -149,16 +157,6 @@
int messageType = senderBean.getMessageType();
-// if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
-// Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-// String sequenceID = sequence.getIdentifier().getIdentifier();
-// }
-
-// if (AcknowledgementManager.ackRequired (rmMsgCtx)) {
-// RMMsgCreator.addAckMessage(rmMsgCtx);
-
- //} else
-
if (isAckPiggybackableMsgType(messageType)) { // checking weather this message can carry piggybacked acks
// checking weather this message can carry piggybacked acks
// piggybacking if an ack if available for the same
@@ -332,12 +330,73 @@
log.debug("Exit: SenderWorker::run");
}
+ /**
+ * Update the message before sending it. We adjust the retransmission intervals and send counts
+ * for the message. If the message is an application message then we ensure that we have added
+ * the Sequence header.
+ */
private boolean updateMessage(RMMsgContext rmMsgContext, SenderBean senderBean, StorageManager storageManager) throws AxisFault {
boolean continueSending = MessageRetransmissionAdjuster.adjustRetransmittion(
rmMsgContext, senderBean, rmMsgContext.getConfigurationContext(), storageManager);
+ if(!continueSending) return false;
+
+ Identifier id = null;
+
+ if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+ RMSequenceBean bean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, senderBean.getSequenceID());
+ String namespace = SpecSpecificConstants.getRMNamespaceValue(bean.getRMVersion());
+ Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if(sequence == null) {
+ sequence = new Sequence(namespace);
+
+ MessageNumber msgNumber = new MessageNumber(namespace);
+ msgNumber.setMessageNumber(senderBean.getMessageNumber());
+ sequence.setMessageNumber(msgNumber);
+
+ if(senderBean.isLastMessage() &&
+ SpecSpecificConstants.isLastMessageIndicatorRequired(bean.getRMVersion())) {
+ sequence.setLastMessage(new LastMessage(namespace));
+ }
+
+ // We just create the id here, we will add the value in later
+ id = new Identifier(namespace);
+ sequence.setIdentifier(id);
+
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
+ }
+
+ } else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+ TerminateSequence terminate = (TerminateSequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ id = terminate.getIdentifier();
+
+ } else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
+ CloseSequence close = (CloseSequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+ id = close.getIdentifier();
+
+ } else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+ // The only time that we can have a message of this type is when we are sending a
+ // stand-alone ack request, and in that case we only expect to find a single ack
+ // request header in the message.
+ Iterator ackRequests = rmMsgContext.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
+ AckRequested ackRequest = (AckRequested) ackRequests.next();
+ if (ackRequests.hasNext()) {
+ throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
+ }
+ id = ackRequest.getIdentifier();
+ }
+
+ // TODO consider adding an extra ack request, as we are about to send the message and we
+ // know which sequence it is associated with.
+
+ if(id != null && !senderBean.getSequenceID().equals(id.getIdentifier())) {
+ id.setIndentifer(senderBean.getSequenceID());
+
+ // Write the changes back into the message context
+ rmMsgContext.addSOAPEnvelope();
+ }
- return continueSending;
+ return true;
}
private boolean isAckPiggybackableMsgType(int messageType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org