You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/11/21 06:06:06 UTC

svn commit: r345831 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: TerminateManager.java msgprocessors/TerminateSeqMsgProcessor.java msgreceivers/RMMessageReceiver.java workers/InOrderInvoker.java workers/Sender.java

Author: chamikara
Date: Sun Nov 20 21:05:32 2005
New Revision: 345831

URL: http://svn.apache.org/viewcvs?rev=345831&view=rev
Log:
TerminateManager was added.
this has logic to clean a data saved due to a sequence.
currently cleaning happens as following
   Sending side -> After sending the terminate seq message
   Receiving side -> 
            IF (NOT_INORDER_INVOCATION)
               at the TerminateMsgProcessor
            ELSE
               at the terminateMsgProcessor + InOrderInvoker

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=345831&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Sun Nov 20 21:05:32 2005
@@ -0,0 +1,160 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.storage.beans.StorageMapBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class TerminateManager {
+
+	public static void terminateReceivingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+		
+		//removing nextMsgMgr entries
+		NextMsgBean findNextMsgBean = new NextMsgBean ();
+		findNextMsgBean.setSequenceId(sequenceID);
+		Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
+			nextMsgBeanMgr.delete(nextMsgBean.getSequenceId());
+		}
+		
+		if(Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE!=Constants.QOS.DeliveryAssurance.IN_ORDER) { 
+			terminateAfterInvocation(configContext,sequenceID);
+		}
+
+	}
+	
+	public static void terminateAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		StorageMapBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+
+		//removing storageMap entries
+		StorageMapBean findStorageMapBean = new StorageMapBean ();
+		findStorageMapBean.setSequenceId(sequenceID);
+		Collection collection = storageMapBeanMgr.find(findStorageMapBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			StorageMapBean storageMapBean = (StorageMapBean) iterator.next();
+			storageMapBeanMgr.delete(storageMapBean.getKey());
+		}
+		
+		//TODO - refine below (removing sequence properties of the receiving side).
+		//removing sequencePropertyEntries.
+//		SequencePropertyBean findSequencePropBean = new SequencePropertyBean ();
+//		findSequencePropBean.setSequenceId(sequenceID);
+//		collection = sequencePropertyBeanMgr.find(findSequencePropBean);
+//		iterator = collection.iterator();
+//		while (iterator.hasNext()) {
+//			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+//			boolean propertyRequiredForResponses = isRequiredForResponseSide (sequencePropertyBean.getName());
+//			if (!propertyRequiredForResponses)
+//				sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+//		}
+		
+		SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Constants.SequenceProperties.ALL_SEQUENCES,Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+		ArrayList allSequenceList = (ArrayList) allSequenceBean.getValue();
+		
+		allSequenceList.remove(sequenceID);
+	}
+	
+	private static boolean isRequiredForResponseSide (String name) {
+		if (name==null && name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))
+			return false;
+		
+		if (name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))
+			return false;
+		
+		return false;
+	}
+	
+	public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		RetransmitterBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+		CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
+		
+		SequencePropertyBean tempSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Constants.SequenceProperties.TEMP_SEQUENCE_ID);
+		if (tempSequenceBean==null)
+			throw new SandeshaException ("TempSequence entry not found");
+		
+		String tempSequenceId = (String) tempSequenceBean.getValue();
+		
+		//removing retransmitterMgr entries
+		RetransmitterBean findRetransmitterBean = new RetransmitterBean ();
+		findRetransmitterBean.setTempSequenceId(tempSequenceId);
+		Collection collection = retransmitterBeanMgr.find(findRetransmitterBean);
+		Iterator iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			RetransmitterBean retransmitterBean = (RetransmitterBean) iterator.next();
+			retransmitterBeanMgr.delete(retransmitterBean.getMessageId());
+		}
+		
+		//removing the createSeqMgrEntry
+		CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
+		findCreateSequenceBean.setTempSequenceId(tempSequenceId);
+		collection = createSeqBeanMgr.find(findCreateSequenceBean);
+		iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			CreateSeqBean createSeqBean = (CreateSeqBean) iterator.next();
+			createSeqBeanMgr.delete(createSeqBean.getCreateSeqMsgId());
+		}
+		
+		//removing sequence properties
+		SequencePropertyBean findSequencePropertyBean1 = new SequencePropertyBean ();
+		findSequencePropertyBean1.setSequenceId(tempSequenceId);
+		collection = sequencePropertyBeanMgr.find(findSequencePropertyBean1);
+		iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+			sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+		}
+		
+		SequencePropertyBean findSequencePropertyBean2 = new SequencePropertyBean ();
+		findSequencePropertyBean2.setSequenceId(tempSequenceId);
+		collection = sequencePropertyBeanMgr.find(findSequencePropertyBean2);
+		iterator = collection.iterator();
+		while (iterator.hasNext()) {
+			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+			sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+		}
+		
+	}
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Sun Nov 20 21:05:32 2005
@@ -18,11 +18,20 @@
 package org.apache.sandesha2.msgprocessors;
 
 import javax.xml.namespace.QName;
+
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.sandesha2.Constants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
  * @author Chamikara Jayalath <ch...@gmail.com>
@@ -44,6 +53,19 @@
 
 		//Processing the terminate message
 		//TODO Add terminate sequence message logic.
+		TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMSg.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);
+		if (terminateSequence==null)
+			throw new SandeshaException ("Terminate Sequence part is not available");
+		
+		String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+		if (sequenceId==null || "".equals(sequenceId))
+			throw new SandeshaException ("Invalid sequence id");
+		
+		ConfigurationContext context = terminateSeqMsg.getSystemContext();
+
+		
+		TerminateManager.terminateReceivingSide(context,sequenceId);
+		
 
 		terminateSeqMsg.setPausedTrue(new QName(Constants.IN_HANDLER_NAME));
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Sun Nov 20 21:05:32 2005
@@ -21,6 +21,9 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
  * @author Chamikara Jayalath <ch...@gmail.com>
@@ -36,6 +39,9 @@
 
 	public final void receive(MessageContext messgeCtx) throws AxisFault {
 		System.out.println("RM MESSSAGE RECEIVER WAS CALLED");
+		
+		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(messgeCtx);
+		System.out.println("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
 	}
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Sun Nov 20 21:05:32 2005
@@ -27,6 +27,7 @@
 import org.apache.sandesha2.Constants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
@@ -101,7 +102,7 @@
 						.getValue();
 				Iterator seqPropIt = seqPropList.iterator();
 
-				while (seqPropIt.hasNext()) {
+				currentIteration: while (seqPropIt.hasNext()) {
 
 					String sequenceId = (String) seqPropIt.next();
 
@@ -168,10 +169,24 @@
 								.find(
 										new StorageMapBean(null, nextMsgno,
 												sequenceId)).iterator();
+
+						//terminate (AfterInvocation)
+						if (rmMsg.getMessageType() == Constants.MessageTypes.APPLICATION) {
+							Sequence sequence = (Sequence) rmMsg
+									.getMessagePart(Constants.MessageParts.SEQUENCE);
+							if (sequence.getLastMessage() != null) {
+								TerminateManager.terminateAfterInvocation(
+										context, sequenceId);
+								
+								//exit from current iteration. (since an entry was removed)
+								break currentIteration;
+							}
+						}
 					}
 
 					nextMsgBean.setNextMsgNoToProcess(nextMsgno);
 					nextMsgMgr.update(nextMsgBean);
+
 				}
 			} catch (SandeshaException e1) {
 				// TODO Auto-generated catch block

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun Nov 20 21:05:32 2005
@@ -28,12 +28,14 @@
 import org.apache.sandesha2.Constants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
 import org.apache.sandesha2.storage.beans.RetransmitterBean;
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
  * @author Chamikara Jayalath <ch...@gmail.com>
@@ -121,6 +123,16 @@
 
 						if (!msgCtx.isServerSide())
 							checkForSyncResponses(msgCtx);
+						
+						
+						if (rmMsgCtx.getMessageType()==Constants.MessageTypes.TERMINATE_SEQ) {
+							//terminate sending side.
+							TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);
+							String sequenceID = terminateSequence.getIdentifier().getIdentifier();
+							ConfigurationContext configContext = msgCtx.getSystemContext();
+							
+							TerminateManager.terminateSendingSide(configContext,sequenceID);
+						}
 
 					} catch (AxisFault e1) {
 						e1.printStackTrace();



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org