You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/11/21 06:06:06 UTC
svn commit: r345831 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: TerminateManager.java
msgprocessors/TerminateSeqMsgProcessor.java
msgreceivers/RMMessageReceiver.java workers/InOrderInvoker.java
workers/Sender.java
Author: chamikara
Date: Sun Nov 20 21:05:32 2005
New Revision: 345831
URL: http://svn.apache.org/viewcvs?rev=345831&view=rev
Log:
TerminateManager was added.
this has logic to clean a data saved due to a sequence.
currently cleaning happens as following
Sending side -> After sending the terminate seq message
Receiving side ->
IF (NOT_INORDER_INVOCATION)
at the TerminateMsgProcessor
ELSE
at the terminateMsgProcessor + InOrderInvoker
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=345831&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Sun Nov 20 21:05:32 2005
@@ -0,0 +1,160 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.storage.beans.StorageMapBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class TerminateManager {
+
+ public static void terminateReceivingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+
+ //removing nextMsgMgr entries
+ NextMsgBean findNextMsgBean = new NextMsgBean ();
+ findNextMsgBean.setSequenceId(sequenceID);
+ Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
+ nextMsgBeanMgr.delete(nextMsgBean.getSequenceId());
+ }
+
+ if(Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE!=Constants.QOS.DeliveryAssurance.IN_ORDER) {
+ terminateAfterInvocation(configContext,sequenceID);
+ }
+
+ }
+
+ public static void terminateAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+ StorageMapBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
+
+ //removing storageMap entries
+ StorageMapBean findStorageMapBean = new StorageMapBean ();
+ findStorageMapBean.setSequenceId(sequenceID);
+ Collection collection = storageMapBeanMgr.find(findStorageMapBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ StorageMapBean storageMapBean = (StorageMapBean) iterator.next();
+ storageMapBeanMgr.delete(storageMapBean.getKey());
+ }
+
+ //TODO - refine below (removing sequence properties of the receiving side).
+ //removing sequencePropertyEntries.
+// SequencePropertyBean findSequencePropBean = new SequencePropertyBean ();
+// findSequencePropBean.setSequenceId(sequenceID);
+// collection = sequencePropertyBeanMgr.find(findSequencePropBean);
+// iterator = collection.iterator();
+// while (iterator.hasNext()) {
+// SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+// boolean propertyRequiredForResponses = isRequiredForResponseSide (sequencePropertyBean.getName());
+// if (!propertyRequiredForResponses)
+// sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+// }
+
+ SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Constants.SequenceProperties.ALL_SEQUENCES,Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ ArrayList allSequenceList = (ArrayList) allSequenceBean.getValue();
+
+ allSequenceList.remove(sequenceID);
+ }
+
+ private static boolean isRequiredForResponseSide (String name) {
+ if (name==null && name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))
+ return false;
+
+ if (name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))
+ return false;
+
+ return false;
+ }
+
+ public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+ RetransmitterBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
+ CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
+
+ SequencePropertyBean tempSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Constants.SequenceProperties.TEMP_SEQUENCE_ID);
+ if (tempSequenceBean==null)
+ throw new SandeshaException ("TempSequence entry not found");
+
+ String tempSequenceId = (String) tempSequenceBean.getValue();
+
+ //removing retransmitterMgr entries
+ RetransmitterBean findRetransmitterBean = new RetransmitterBean ();
+ findRetransmitterBean.setTempSequenceId(tempSequenceId);
+ Collection collection = retransmitterBeanMgr.find(findRetransmitterBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ RetransmitterBean retransmitterBean = (RetransmitterBean) iterator.next();
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageId());
+ }
+
+ //removing the createSeqMgrEntry
+ CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
+ findCreateSequenceBean.setTempSequenceId(tempSequenceId);
+ collection = createSeqBeanMgr.find(findCreateSequenceBean);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ CreateSeqBean createSeqBean = (CreateSeqBean) iterator.next();
+ createSeqBeanMgr.delete(createSeqBean.getCreateSeqMsgId());
+ }
+
+ //removing sequence properties
+ SequencePropertyBean findSequencePropertyBean1 = new SequencePropertyBean ();
+ findSequencePropertyBean1.setSequenceId(tempSequenceId);
+ collection = sequencePropertyBeanMgr.find(findSequencePropertyBean1);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+ sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+ }
+
+ SequencePropertyBean findSequencePropertyBean2 = new SequencePropertyBean ();
+ findSequencePropertyBean2.setSequenceId(tempSequenceId);
+ collection = sequencePropertyBeanMgr.find(findSequencePropertyBean2);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
+ sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+ }
+
+ }
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Sun Nov 20 21:05:32 2005
@@ -18,11 +18,20 @@
package org.apache.sandesha2.msgprocessors;
import javax.xml.namespace.QName;
+
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* @author Chamikara Jayalath <ch...@gmail.com>
@@ -44,6 +53,19 @@
//Processing the terminate message
//TODO Add terminate sequence message logic.
+ TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMSg.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);
+ if (terminateSequence==null)
+ throw new SandeshaException ("Terminate Sequence part is not available");
+
+ String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+ if (sequenceId==null || "".equals(sequenceId))
+ throw new SandeshaException ("Invalid sequence id");
+
+ ConfigurationContext context = terminateSeqMsg.getSystemContext();
+
+
+ TerminateManager.terminateReceivingSide(context,sequenceId);
+
terminateSeqMsg.setPausedTrue(new QName(Constants.IN_HANDLER_NAME));
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Sun Nov 20 21:05:32 2005
@@ -21,6 +21,9 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
* @author Chamikara Jayalath <ch...@gmail.com>
@@ -36,6 +39,9 @@
public final void receive(MessageContext messgeCtx) throws AxisFault {
System.out.println("RM MESSSAGE RECEIVER WAS CALLED");
+
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(messgeCtx);
+ System.out.println("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Sun Nov 20 21:05:32 2005
@@ -27,6 +27,7 @@
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
@@ -101,7 +102,7 @@
.getValue();
Iterator seqPropIt = seqPropList.iterator();
- while (seqPropIt.hasNext()) {
+ currentIteration: while (seqPropIt.hasNext()) {
String sequenceId = (String) seqPropIt.next();
@@ -168,10 +169,24 @@
.find(
new StorageMapBean(null, nextMsgno,
sequenceId)).iterator();
+
+ //terminate (AfterInvocation)
+ if (rmMsg.getMessageType() == Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsg
+ .getMessagePart(Constants.MessageParts.SEQUENCE);
+ if (sequence.getLastMessage() != null) {
+ TerminateManager.terminateAfterInvocation(
+ context, sequenceId);
+
+ //exit from current iteration. (since an entry was removed)
+ break currentIteration;
+ }
+ }
}
nextMsgBean.setNextMsgNoToProcess(nextMsgno);
nextMsgMgr.update(nextMsgBean);
+
}
} catch (SandeshaException e1) {
// TODO Auto-generated catch block
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun Nov 20 21:05:32 2005
@@ -28,12 +28,14 @@
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
import org.apache.sandesha2.storage.beans.RetransmitterBean;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* @author Chamikara Jayalath <ch...@gmail.com>
@@ -121,6 +123,16 @@
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
+
+
+ if (rmMsgCtx.getMessageType()==Constants.MessageTypes.TERMINATE_SEQ) {
+ //terminate sending side.
+ TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID = terminateSequence.getIdentifier().getIdentifier();
+ ConfigurationContext configContext = msgCtx.getSystemContext();
+
+ TerminateManager.terminateSendingSide(configContext,sequenceID);
+ }
} catch (AxisFault e1) {
e1.printStackTrace();
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org