You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2007/02/01 14:53:38 UTC

svn commit: r502213 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java msgprocessors/CreateSeqResponseMsgProcessor.java storage/beans/SenderBean.java util/RMMsgCreator.java workers/SenderWorker.java

Author: mlovett
Date: Thu Feb  1 05:53:37 2007
New Revision: 502213

URL: http://svn.apache.org/viewvc?view=rev&rev=502213
Log:
Avoid loading and saving message contexts when we get a create sequence response

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Feb  1 05:53:37 2007
@@ -53,11 +53,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.CreateSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
 import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.SequenceOffer;
 
@@ -592,16 +588,8 @@
 		else
 			rmMsg.setTo(toEPR);
 
-		String rmVersion = rmsBean.getRMVersion();
-
-		String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
-		Sequence sequence = new Sequence(rmNamespaceValue);
-		MessageNumber msgNumber = new MessageNumber(rmNamespaceValue);
-		msgNumber.setMessageNumber(messageNumber);
-		sequence.setMessageNumber(msgNumber);
-
 		// setting last message
+		boolean lastMessage = false;
 		if (msg.isServerSide()) {
 			MessageContext requestMsg = null;
 
@@ -617,7 +605,7 @@
 			}
 
 			if (requestSequence.getLastMessage() != null) {
-				sequence.setLastMessage(new LastMessage(rmNamespaceValue));
+				lastMessage = true;
 			}
 
 		} else {
@@ -627,51 +615,15 @@
 			if (operationContext != null) {
 				Object obj = msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
 				if (obj != null && "true".equals(obj)) {
-
-					if (SpecSpecificConstants.isLastMessageIndicatorRequired(rmVersion))
-						sequence.setLastMessage(new LastMessage(rmNamespaceValue));
+					lastMessage = true;
 				}
 			}
 		}
 
-		AckRequested ackRequested = null;
-
-		boolean addAckRequested = false;
-		// if (!lastMessage)
-		// addAckRequested = true; //TODO decide the policy to add the
-		// ackRequested tag
-
-		// setting the Sequence id.
-		// Set send = true/false depending on the availability of the out
-		// sequence id.
-		String identifierStr = null;
-		if (outSequenceID == null) {
-			identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
-		} else {
-			identifierStr = outSequenceID;
-		}
-
-		Identifier id1 = new Identifier(rmNamespaceValue);
-		id1.setIndentifer(identifierStr);
-		sequence.setIdentifier(id1);
-		rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
-
-		if (addAckRequested) {
-			ackRequested = new AckRequested(rmNamespaceValue);
-			Identifier id2 = new Identifier(rmNamespaceValue);
-			id2.setIndentifer(identifierStr);
-			ackRequested.setIdentifier(id2);
-			rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST, ackRequested);
-		}
-
-		// Now that we have added the headers to the message, make sure that we secure it with
-		// the correct token.
+		// Now that we have decided which sequence to use for the message, make sure that we secure
+		// it with the correct token.
 		RMMsgCreator.secureOutboundMessage(rmsBean, msg);
 
-		rmMsg.addSOAPEnvelope();
-
-
 		// Retransmitter bean entry for the application message
 		SenderBean appMsgEntry = new SenderBean();
 
@@ -680,6 +632,7 @@
 		appMsgEntry.setTimeToSend(System.currentTimeMillis());
 		appMsgEntry.setMessageID(rmMsg.getMessageId());
 		appMsgEntry.setMessageNumber(messageNumber);
+		appMsgEntry.setLastMessage(lastMessage);
 		appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
 		if (outSequenceID == null) {
 			appMsgEntry.setSend(false);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Feb  1 05:53:37 2007
@@ -44,17 +44,10 @@
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.wsrm.Accept;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
 import org.apache.sandesha2.wsrm.CreateSequenceResponse;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
  * Responsible for processing an incoming Create Sequence Response message.
@@ -226,105 +219,20 @@
 		rmsBean.setLastActivatedTime(System.currentTimeMillis());
 		rmsBeanMgr.update(rmsBean);
 
+		// Locate and update all of the messages for this sequence, now that we know
+		// the sequence id.
 		SenderBean target = new SenderBean();
 		target.setInternalSequenceID(internalSequenceId);
 		target.setSend(false);
-
+		
 		Iterator iterator = retransmitterMgr.find(target).iterator();
 		while (iterator.hasNext()) {
 			SenderBean tempBean = (SenderBean) iterator.next();
 
-			// updating the application message
-			String key = tempBean.getMessageContextRefKey();
-			MessageContext applicationMsg = storageManager.retrieveMessageContext(key, configCtx);
-
-			// TODO make following exception message more understandable to the
-			// user (probably some others exceptions messages as well)
-			if (applicationMsg == null)
-				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
-
-			String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion());
-
-			RMMsgContext applicaionRMMsg = MsgInitializer.initializeMessage(applicationMsg);
-
-			if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
-				
-				Sequence sequencePart = (Sequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-				if (sequencePart == null) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
-					log.debug(message);
-					throw new SandeshaException(message);
-				}
-	
-				Identifier identifier = new Identifier(assumedRMNamespace);
-				identifier.setIndentifer(newOutSequenceId);
-	
-				sequencePart.setIdentifier(identifier);
-				
-			} else if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-				
-				TerminateSequence sequencePart = (TerminateSequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-				if (sequencePart == null) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
-					log.debug(message);
-					throw new SandeshaException(message);
-				}
-	
-				Identifier identifier = new Identifier(assumedRMNamespace);
-				identifier.setIndentifer(newOutSequenceId);
-	
-				sequencePart.setIdentifier(identifier);
-
-			} else if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
-			
-				CloseSequence sequencePart = (CloseSequence) applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
-				if (sequencePart == null) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
-					log.debug(message);
-					throw new SandeshaException(message);
-				}
-	
-				Identifier identifier = new Identifier(assumedRMNamespace);
-				identifier.setIndentifer(newOutSequenceId);
-	
-				sequencePart.setIdentifier(identifier);
-				
-			} else if (tempBean.getMessageType() == Sandesha2Constants.MessageTypes.ACK_REQUEST) {
-
-				Iterator headerIterator = applicaionRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
-								
-				AckRequested sequencePart = null;
-				while (headerIterator.hasNext()) {
-					sequencePart = (AckRequested) headerIterator.next(); 
-				}
-				
-				if (headerIterator.hasNext()) {
-					throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
-				}
-				
-				if (sequencePart == null) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
-					log.debug(message);
-					throw new SandeshaException(message);
-				}
-				
-				sequencePart.getIdentifier().setIndentifer(newOutSequenceId);
-					
-			}
-
-			try {
-				applicaionRMMsg.addSOAPEnvelope();
-			} catch (AxisFault e) {
-				throw new SandeshaException(e.getMessage(), e);
-			}
-
 			// asking to send the application msssage
 			tempBean.setSend(true);
 			tempBean.setSequenceID(newOutSequenceId);
 			retransmitterMgr.update(tempBean);
-
-			// updating the message. this will correct the SOAP envelope string.
-			storageManager.updateMessageContext(key, applicationMsg);
 		}
 
 		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java Thu Feb  1 05:53:37 2007
@@ -96,6 +96,11 @@
 	private int messageType =0;
 	
 	/**
+	 * Flag to indicate if this is the last message for the sequence
+	 */
+	private boolean lastMessage = false;
+	
+	/**
 	 * Flags that are used to check if the primitive types on this bean
 	 * have been set. If a primitive type has not been set then it will
 	 * be ignored within the match method.
@@ -107,6 +112,7 @@
 	private static final int RESEND_FLAG       = 0x00001000;
 	private static final int TIME_TO_SEND_FLAG = 0x00010000;
 	private static final int MSG_TYPE_FLAG     = 0x00100000;
+	private static final int LAST_MSG_FLAG     = 0x01000000;
 
 	public SenderBean() {
 
@@ -216,6 +222,15 @@
 	public void setToAddress(String toAddress) {
 		this.toAddress = toAddress;
 	}
+
+	public boolean isLastMessage() {
+		return lastMessage;
+	}
+
+	public void setLastMessage(boolean lastMessage) {
+		this.lastMessage = lastMessage;
+		this.flags |= LAST_MSG_FLAG;
+	}
 	
 	public String toString() {
 		StringBuffer result = new StringBuffer();
@@ -271,6 +286,10 @@
 		else if((bean.flags & MSG_TYPE_FLAG) != 0 && bean.getMessageType() != this.getMessageType())
 			match = false;
 		
+		else if((bean.flags & LAST_MSG_FLAG) != 0 && bean.isLastMessage() != this.isLastMessage())
+			match = false;
+
 		return match;
 	}
+
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java Thu Feb  1 05:53:37 2007
@@ -91,27 +91,21 @@
 		if (context == null)
 			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
 
-		MessageContext createSeqmsgContext;
-		try {
-			// creating by copying common contents. (this will not set contexts
-			// except for configCtx).
-			AxisOperation createSequenceOperation = SpecSpecificConstants.getWSRMOperation(
-					Sandesha2Constants.MessageTypes.CREATE_SEQ,
-					rmsBean.getRMVersion(),
-					applicationMsgContext.getAxisService());
+		// creating by copying common contents. (this will not set contexts
+		// except for configCtx).
+		AxisOperation createSequenceOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.CREATE_SEQ,
+				rmsBean.getRMVersion(),
+				applicationMsgContext.getAxisService());
 
-			createSeqmsgContext = SandeshaUtil
-					.createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
-			
-			OperationContext createSeqOpCtx = createSeqmsgContext.getOperationContext();
-			String createSeqMsgId = SandeshaUtil.getUUID();
-			createSeqmsgContext.setMessageID(createSeqMsgId);
-			context.registerOperationContext(createSeqMsgId, createSeqOpCtx);
+		MessageContext createSeqmsgContext = SandeshaUtil
+				.createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
+		
+		OperationContext createSeqOpCtx = createSeqmsgContext.getOperationContext();
+		String createSeqMsgId = SandeshaUtil.getUUID();
+		createSeqmsgContext.setMessageID(createSeqMsgId);
+		context.registerOperationContext(createSeqMsgId, createSeqOpCtx);
 
-		} catch (AxisFault e) {
-			throw new SandeshaException(e.getMessage(), e);
-		}
-        
 		RMMsgContext createSeqRMMsg = new RMMsgContext(createSeqmsgContext);
 
 		String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion());

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Thu Feb  1 05:53:37 2007
@@ -1,6 +1,7 @@
 package org.apache.sandesha2.workers;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import javax.xml.namespace.QName;
 
@@ -30,6 +31,7 @@
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
@@ -37,6 +39,12 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.LastMessage;
+import org.apache.sandesha2.wsrm.MessageNumber;
+import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
 public class SenderWorker extends SandeshaWorker implements Runnable {
@@ -149,16 +157,6 @@
 
 			int messageType = senderBean.getMessageType();
 			
-//			if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
-//				Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-//				String sequenceID = sequence.getIdentifier().getIdentifier();
-//			}
-
-//			if (AcknowledgementManager.ackRequired (rmMsgCtx)) {
-//				RMMsgCreator.addAckMessage(rmMsgCtx);
-			
-			//} else 
-				
 			if (isAckPiggybackableMsgType(messageType)) { // checking weather this message can carry piggybacked acks
 				// checking weather this message can carry piggybacked acks
 				// piggybacking if an ack if available for the same
@@ -332,12 +330,73 @@
 			log.debug("Exit: SenderWorker::run");
 	}
 	
+	/**
+	 * Update the message before sending it. We adjust the retransmission intervals and send counts
+	 * for the message. If the message is an application message then we ensure that we have added
+	 * the Sequence header.
+	 */
 	private boolean updateMessage(RMMsgContext rmMsgContext, SenderBean senderBean, StorageManager storageManager) throws AxisFault {
 		
 		boolean continueSending = MessageRetransmissionAdjuster.adjustRetransmittion(
 				rmMsgContext, senderBean, rmMsgContext.getConfigurationContext(), storageManager);
+		if(!continueSending) return false;
+		
+		Identifier id = null;
+
+		if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+			RMSequenceBean bean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, senderBean.getSequenceID());
+			String namespace = SpecSpecificConstants.getRMNamespaceValue(bean.getRMVersion());
+			Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+			if(sequence == null) {
+				sequence = new Sequence(namespace);
+				
+				MessageNumber msgNumber = new MessageNumber(namespace);
+				msgNumber.setMessageNumber(senderBean.getMessageNumber());
+				sequence.setMessageNumber(msgNumber);
+
+				if(senderBean.isLastMessage() &&
+				   SpecSpecificConstants.isLastMessageIndicatorRequired(bean.getRMVersion())) {
+					sequence.setLastMessage(new LastMessage(namespace));
+				}
+				
+				// We just create the id here, we will add the value in later
+				id = new Identifier(namespace);
+				sequence.setIdentifier(id);
+				
+				rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
+			}
+			
+		} else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+			TerminateSequence terminate = (TerminateSequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+			id = terminate.getIdentifier();
+
+		} else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
+			CloseSequence close = (CloseSequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+			id = close.getIdentifier();
+		
+		} else if(senderBean.getMessageType() == Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+			// The only time that we can have a message of this type is when we are sending a
+			// stand-alone ack request, and in that case we only expect to find a single ack
+			// request header in the message.
+			Iterator ackRequests = rmMsgContext.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
+			AckRequested ackRequest = (AckRequested) ackRequests.next(); 
+			if (ackRequests.hasNext()) {
+				throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
+			}
+			id = ackRequest.getIdentifier();
+		}
+		
+		// TODO consider adding an extra ack request, as we are about to send the message and we
+		// know which sequence it is associated with.
+
+		if(id != null && !senderBean.getSequenceID().equals(id.getIdentifier())) {
+			id.setIndentifer(senderBean.getSequenceID());
+
+			// Write the changes back into the message context
+			rmMsgContext.addSOAPEnvelope();
+		}
 		
-		return continueSending;
+		return true;
 	}
 	
 	private boolean isAckPiggybackableMsgType(int messageType) {



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org