You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/12/28 19:41:08 UTC

svn commit: r359586 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ storage/inmemory/ util/

Author: chamikara
Date: Wed Dec 28 10:40:22 2005
New Revision: 359586

URL: http://svn.apache.org/viewcvs?rev=359586&view=rev
Log:
Added to functionality get an Report explainint the status of a sequence. Currently this has two functions.
1. To know weather a sequence has been completed (i.e. all messages have been received and terminate has been sent).
2. To get the number of messages that has been acked upto the current time.

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java?rev=359586&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMReport.java Wed Dec 28 10:40:22 2005
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sandesha2;
+
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+public class RMReport {
+	
+	private long ackedMessageCount = 0;
+	private boolean sequenceCompleted = false;
+	
+	
+
+	public long getAckedMessageCount() {
+		return ackedMessageCount;
+	}
+	
+	public void setAckedMessageCount(long ackedMessageCount) {
+		this.ackedMessageCount = ackedMessageCount;
+	}
+	
+	public boolean isSequenceCompleted() {
+		return sequenceCompleted;
+	}
+	
+	public void setSequenceCompleted(boolean sequenceCompleted) {
+		this.sequenceCompleted = sequenceCompleted;
+	}
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2ClientAPI.java Wed Dec 28 10:40:22 2005
@@ -16,6 +16,10 @@
 
 package org.apache.sandesha2;
 
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+
 /**
  * Contains all the Sandesha2Constants of Sandesha2.
  * Please see sub-interfaces to see grouped data.
@@ -23,12 +27,22 @@
  * @author Chamikara Jayalath <ch...@gmail.com>
  */
 
-public interface Sandesha2ClientAPI {
+public class Sandesha2ClientAPI {
 
-	String AcksTo = "Sandesha2ClientAPIPropertyAcksTo";
-	String LAST_MESSAGE = "Sandesha2ClientAPIPropertyWSRMLastMessage";
-	String OFFERED_SEQUENCE_ID = "Sandesha2ClientAPIPropertyOfferedSequenceId";
-	String SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode";
-	String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
+	public static String AcksTo = "Sandesha2ClientAPIPropertyAcksTo";
+	public static String LAST_MESSAGE = "Sandesha2ClientAPIPropertyWSRMLastMessage";
+	public static String OFFERED_SEQUENCE_ID = "Sandesha2ClientAPIPropertyOfferedSequenceId";
+	public static String SANDESHA_DEBUG_MODE = "Sandesha2ClientAPIPropertyDebugMode";
+	public static String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
 
+	public static RMReport getRMReport (String to, String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
+		
+		String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
+		
+		RMReport rmReport = new RMReport ();
+		rmReport.setAckedMessageCount(SequenceManager.getAckedMessageCount (internalSequenceID,configurationContext));
+		rmReport.setSequenceCompleted(SequenceManager.isSequenceCompleted (internalSequenceID,configurationContext));
+		
+		return rmReport;
+	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Wed Dec 28 10:40:22 2005
@@ -180,6 +180,10 @@
 		String TERMINATE_ADDED = "TerminateAdded";
 		
 		String LAST_ACTIVATED_TIME = "LastActivatedTime";
+		
+		String NO_OF_MSGS_ACKED = "NoOfMessagesAcked";
+		
+		String TRANSPORT_TO = "TransportTo";
 	}
 
 	public interface SOAPVersion {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Wed Dec 28 10:40:22 2005
@@ -151,7 +151,9 @@
 		Collection collection = retransmitterBeanMgr.find(internalSequenceId);
 		Iterator iterator = collection.iterator();
 		while (iterator.hasNext()) {
-			SenderBean retransmitterBean = (SenderBean) iterator.next();
+			Object obj = iterator.next();
+			System.out.println(obj);
+			SenderBean retransmitterBean = (SenderBean) obj;
 			retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
 		}
 		
@@ -183,6 +185,20 @@
 			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
 			sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
 		}
+	}
+	
+	private boolean isProportyDeletable (String name) {
+		boolean deleatable = true;
 		
+		if (Sandesha2Constants.SequenceProperties.TERMINATE_ADDED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED.equals(name))
+			deleatable = false;
+		
+		if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
+			deleatable = false;
+		
+		return deleatable;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Wed Dec 28 10:40:22 2005
@@ -168,12 +168,12 @@
 				throw new AxisFault(
 						"TO End Point Reference is not set correctly. This is a must for the sandesha client side.");
 
-			internalSequenceId = toEPR.getAddress();
+			String to = toEPR.getAddress();
 			OperationContext opContext = msgCtx.getOperationContext();
 			String sequenceKey = (String) msgCtx
 					.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
-			if (sequenceKey != null)
-				internalSequenceId = internalSequenceId + sequenceKey;
+			
+			internalSequenceId = SandeshaUtil.getInternalSequenceID(to,sequenceKey);
 
 		}
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Wed Dec 28 10:40:22 2005
@@ -25,6 +25,7 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.MessageContextConstants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
@@ -133,6 +134,28 @@
 			//TODO - Process Nack
 		}
 
+		
+		//setting acked message date.
+		//TODO add details specific to each message.
+		long noOfMsgsAcked = getNoOfMessagesAcked(sequenceAck.getAcknowledgementRanges().iterator());
+		SequencePropertyBean ackedMessagesBean = seqPropMgr.retrieve(outSequenceId,Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
+		boolean added = false;
+		
+		if (ackedMessagesBean==null) {
+			added = true;
+			ackedMessagesBean = new SequencePropertyBean ();
+			ackedMessagesBean.setSequenceID(outSequenceId);
+			ackedMessagesBean.setName(Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
+		}
+		
+		ackedMessagesBean.setValue(Long.toString(noOfMsgsAcked));
+		
+		if (added) 
+			seqPropMgr.insert(ackedMessagesBean);
+		else
+			seqPropMgr.update(ackedMessagesBean);
+		
+		
 		//following get called in the SandesaInHandler
 		//if (justSendTerminateIfNeeded) {
 		//If all messages up to last message have been acknowledged.
@@ -160,6 +183,8 @@
 			}
 		}
 		
+	
+		
 		//stopping the progress of the message further.
 		//rmMsgCtx.getMessageContext().pause();
 		rmMsgCtx.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));
@@ -222,6 +247,11 @@
 		terminateRMMessage
 				.setSOAPAction(Sandesha2Constants.WSRM.Actions.SOAP_ACTION_TERMINATE_SEQUENCE);
 
+		SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+		if (transportToBean!=null) {
+			terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+		}
+		
 		try {
 			terminateRMMessage.addSOAPEnvelope();
 		} catch (AxisFault e) {
@@ -246,15 +276,34 @@
 		SenderBeanMgr retramsmitterMgr = storageManager
 				.getRetransmitterBeanMgr();
 
+		
+
+		retramsmitterMgr.insert(terminateBean);
+		
 		SequencePropertyBean terminateAdded = new SequencePropertyBean();
 		terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
 		terminateAdded.setSequenceID(outSequenceId);
 		terminateAdded.setValue("true");
 
 		seqPropMgr.insert(terminateAdded);
+		
+		
 
-		retramsmitterMgr.insert(terminateBean);
-
+	}
+	
+	private static long getNoOfMessagesAcked (Iterator ackRangeIterator) {
+		long noOfMsgs = 0;
+		while (ackRangeIterator.hasNext()) {
+			AcknowledgementRange acknowledgementRange = (AcknowledgementRange) ackRangeIterator.next();
+			long lower = acknowledgementRange.getLowerValue();
+			long upper = acknowledgementRange.getUpperValue();
+			
+			for (long i=lower;i<=upper;i++) {
+				noOfMsgs++;
+			}
+		}
+		
+		return noOfMsgs;
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Wed Dec 28 10:40:22 2005
@@ -71,7 +71,7 @@
 		while (iterator.hasNext()) {
 			SenderBean senderBean = (SenderBean) table.get(iterator.next());
 			if (internalSequenceID.equals(senderBean.getInternalSequenceID())) 
-					arrayList.add(internalSequenceID);
+					arrayList.add(senderBean);
 		}
 		
 		return arrayList;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Wed Dec 28 10:40:22 2005
@@ -582,5 +582,16 @@
 		
 		return retArr;
 	}
+	
+	public static String getInternalSequenceID (String to, String sequenceKey) {
+		if (to==null && sequenceKey==null)
+			return null;
+		else if (to==null) 
+			return sequenceKey;
+		else if (sequenceKey==null)
+			return to;
+		else 
+			return to + ":" +sequenceKey;
+	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359586&r1=359585&r2=359586&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Wed Dec 28 10:40:22 2005
@@ -6,11 +6,15 @@
  */
 package org.apache.sandesha2.util;
 
+import java.util.ArrayList;
+import java.util.Collection;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2ClientAPI;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -18,8 +22,10 @@
 import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.wsrm.CreateSequence;
@@ -110,7 +116,7 @@
 	}
 
 	public static void setupNewClientSequence(
-			MessageContext firstAplicationMsgCtx, String iternalSequenceId)
+			MessageContext firstAplicationMsgCtx, String internalSequenceId)
 			throws SandeshaException {
 
 		AbstractContext context = firstAplicationMsgCtx.getConfigurationContext();
@@ -130,7 +136,7 @@
 		if (toEPR == null)
 			throw new SandeshaException("WS-Addressing To is null");
 
-		SequencePropertyBean toBean = new SequencePropertyBean(iternalSequenceId,
+		SequencePropertyBean toBean = new SequencePropertyBean(internalSequenceId,
 				Sandesha2Constants.SequenceProperties.TO_EPR, toEPR.getAddress());
 
 		//Default value for acksTo is anonymous
@@ -139,10 +145,21 @@
 
 		EndpointReference acksToEPR = new EndpointReference(acksTo);
 		SequencePropertyBean acksToBean = new SequencePropertyBean(
-				iternalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
+				internalSequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR,
 				acksToEPR.getAddress());
 		seqPropMgr.insert(toBean);
 		seqPropMgr.insert(acksToBean);
+		
+		//saving transportTo value;
+		String transportTo = (String) firstAplicationMsgCtx.getProperty(MessageContextConstants.TRANSPORT_URL);
+		if (transportTo!=null) {
+			SequencePropertyBean transportToBean = new SequencePropertyBean ();
+			transportToBean.setSequenceID(internalSequenceId);
+			transportToBean.setName(Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+			transportToBean.setValue(transportTo);
+			
+			seqPropMgr.insert(transportToBean);
+		}
 
 	}
 	
@@ -198,6 +215,9 @@
 			//loading default policies.
 			policyBean = PropertyManager.getInstance().getRMPolicyBean();
 		}
+		
+		if (policyBean.getInactiveTimeoutInterval()<=0)
+			return false;
 
 		boolean sequenceTimedOut = false;
 		
@@ -211,5 +231,63 @@
 		
 		return sequenceTimedOut;
 	}
+	
+	public static long getAckedMessageCount (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		Transaction transaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+		findSeqIDBean.setValue(internalSequenceID);
+		findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+		
+		if (seqIDBeans.size()==0)
+			throw new SandeshaException ("A sequence with give data has not been created");
+		
+		if (seqIDBeans.size()>1) 
+			throw new SandeshaException ("Sequence data is not unique. Cant generate report");
+		
+		SequencePropertyBean seqIDBean = (SequencePropertyBean) seqIDBeans.iterator().next();
+		String sequenceID = seqIDBean.getSequenceID();
+
+		SequencePropertyBean ackedMsgBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.NO_OF_MSGS_ACKED);
+		if (ackedMsgBean==null)
+			return 0; //No acknowledgement has been received yet.
+		
+		long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue());
+		
+		return noOfMessagesAcked;
+	}
+	
+	public static boolean isSequenceCompleted (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+		Transaction transaction = storageManager.getTransaction();
+		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+		
+		SequencePropertyBean findSeqIDBean = new SequencePropertyBean ();
+		findSeqIDBean.setValue(internalSequenceID);
+		findSeqIDBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+		Collection seqIDBeans = seqPropBeanMgr.find(findSeqIDBean);
+		
+		if (seqIDBeans.size()==0)
+			throw new SandeshaException ("A sequence with give data has not been created");
+		
+		if (seqIDBeans.size()>1) 
+			throw new SandeshaException ("Sequence data is not unique. Cant generate report");
+		
+		SequencePropertyBean seqIDBean = (SequencePropertyBean) seqIDBeans.iterator().next();
+		String sequenceID = seqIDBean.getSequenceID();
+		
+		SequencePropertyBean terminateAddedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+		if (terminateAddedBean==null)
+			return false;
+		
+		if ("true".equals(terminateAddedBean.getValue()))
+			return true;
+
+		return false;
+	}
+	
 	
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org