You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/11/06 01:02:51 UTC

svn commit: r471577 - in /webservices/sandesha/trunk/java: src/org/apache/sandesha2/client/ src/org/apache/sandesha2/msgprocessors/ src/org/apache/sandesha2/polling/ src/org/apache/sandesha2/util/ test/src/org/apache/sandesha2/ test/src/org/apache/sand...

Author: chamikara
Date: Sun Nov  5 16:02:50 2006
New Revision: 471577

URL: http://svn.apache.org/viewvc?view=rev&rev=471577
Log:
Necessary updates from commit 471576 to the branch.

Added:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Sun Nov  5 16:02:50 2006
@@ -327,11 +327,10 @@
 		}
 
 		// setting a new squenceKey if not already set.
-		String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-		if (sequenceKey == null) {
-			sequenceKey = SandeshaUtil.getUUID();
-			options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
-		}
+		String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+		String	newSequenceKey = SandeshaUtil.getUUID();
+		options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, newSequenceKey);
 
 		String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
 
@@ -356,10 +355,11 @@
 		options.setAction(oldAction);
 		
 		options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_FALSE);
-		
+		options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+				
 		//the generated sequenceKey will be returned. Client can use this to work with this newly generated sequence.
 		
-		return sequenceKey;
+		return newSequenceKey;
 	}
 
 	public static void createSequence(ServiceClient serviceClient, boolean offer, String sequenceKey)

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Sun Nov  5 16:02:50 2006
@@ -39,8 +39,10 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
@@ -179,6 +181,19 @@
 			long msgNo = nack.getNackNumber();
 
 			// TODO - Process Nack
+		}
+		
+		//adding a MakeConnection for the response sequence if needed.
+		String offeredSequenceId = SandeshaUtil.getSequenceProperty(sequencePropertyKey, 
+				Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE, storageManager);
+		if (offeredSequenceId!=null) {
+
+			NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+			NextMsgBean nextMsgBean = nextMsgBeanMgr.retrieve(outSequenceId);
+			
+			if (nextMsgBean!=null && nextMsgBean.isPollingMode())
+				SandeshaUtil.shedulePollingRequest(offeredSequenceId, configCtx);
+			
 		}
 
 		// setting acked message date.

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=auto&rev=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Sun Nov  5 16:02:50 2006
@@ -0,0 +1,52 @@
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.MessagePending;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class MessagePendingProcessor {
+
+	private static final Log log = LogFactory.getLog(MessagePendingProcessor.class);
+	
+	public boolean processMessagePendingHeaders (MessageContext message) throws AxisFault {
+		
+		if (log.isDebugEnabled())
+			log.debug("Enter: MessagePendingProcessor::processMessagePendingHeaders");
+
+		boolean messagePaused = false;
+		
+		RMMsgContext rmMsgContext = MsgInitializer.initializeMessage(message);
+		Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		MessagePending messagePending = (MessagePending) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+		
+		if (sequence!=null) {
+			String sequenceId = sequence.getIdentifier().getIdentifier();
+			
+			if (messagePending!=null) {
+				boolean pending = messagePending.isPending();
+				if (pending) {
+					PollingManager pollingManager = SandeshaUtil.getPollingManager(message.getConfigurationContext());
+					if (pollingManager!=null) {
+						pollingManager.shedulePollingRequest(sequenceId);
+					}
+				}
+			}
+		}
+			
+		
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: MessagePendingProcessor::processMessagePendingHeaders");
+
+		return messagePaused;
+	}
+
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Sun Nov  5 16:02:50 2006
@@ -29,7 +29,9 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.TerminateManager;
@@ -72,6 +74,17 @@
 
 		ConfigurationContext configContext = msgContext.getConfigurationContext();
 
+		//shedulling a polling request for the response side.
+		String offeredSequenceId = SandeshaUtil.getSequenceProperty(sequencePropertyKey, 
+				Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE, storageManager);
+		
+		if (offeredSequenceId!=null) {
+			NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+			NextMsgBean nextMsgBean = nextMsgBeanMgr.retrieve(sequenceId);
+			
+			if (nextMsgBean!=null && nextMsgBean.isPollingMode())
+				SandeshaUtil.shedulePollingRequest(offeredSequenceId, configContext);
+		}
 
 		TerminateManager.terminateSendingSide (configContext, sequencePropertyKey,internalSequenceID, msgContext.isServerSide(),
 				storageManager);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Sun Nov  5 16:02:50 2006
@@ -18,6 +18,7 @@
 package org.apache.sandesha2.polling;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 
@@ -55,37 +56,29 @@
 	/**
 	 * By adding an entry to this, the PollingManager will be asked to do a polling request on this sequence.
 	 */
-	private ArrayList sheduledPollingRequests = null;
+	private HashMap sheduledPollingRequests = null;
 	
-	private final int POLLING_MANAGER_WAIT_TIME = 5000;
+	private final int POLLING_MANAGER_WAIT_TIME = 3000;
 	
 	public void run() {
 		
+		
 		while (isPoll()) {
 			
 			try {
 				
-				try {
-					Thread.sleep(POLLING_MANAGER_WAIT_TIME);
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-				
 				NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
 				
 				//geting the sequences to be polled.
 				//if shedule contains any requests, do the earliest one.
 				//else pick one randomly.
 				
-				String sequenceId = null;
-				if (sheduledPollingRequests.size()>0) {
-					sequenceId = (String )sheduledPollingRequests.get(0);
-					sheduledPollingRequests.remove(0);
-				}
+				String sequenceId = getNextSheduleEntry ();
 
 				NextMsgBean nextMsgBean = null;
 				
 				if (sequenceId==null) {
+					
 					NextMsgBean findBean = new NextMsgBean ();
 					findBean.setPollingMode(true);
 					
@@ -97,6 +90,8 @@
 						nextMsgBean = (NextMsgBean) results.get(item);
 					}
 					
+					
+					
 				} else {
 					NextMsgBean findBean = new NextMsgBean ();
 					findBean.setPollingMode(true);
@@ -173,6 +168,22 @@
 		}
 	}
 	
+	private synchronized String getNextSheduleEntry () {
+		String sequenceId = null;
+		
+		if (sheduledPollingRequests.size()>0) {
+			sequenceId = (String) sheduledPollingRequests.keySet().iterator().next();
+			Integer sequencEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
+			
+			Integer leftCount = new Integer (sequencEntryCount.intValue() -1 );
+			if (leftCount.intValue()==0) 
+				sheduledPollingRequests.remove(sequenceId);
+			
+		}
+		
+		return sequenceId;
+	}
+	
 	/**
 	 * Starts the PollingManager.
 	 * 
@@ -181,7 +192,7 @@
 	 */
 	public synchronized void start (ConfigurationContext configurationContext) throws SandeshaException {
 		this.configurationContext = configurationContext;
-		this.sheduledPollingRequests = new ArrayList ();
+		this.sheduledPollingRequests = new HashMap ();
 		this.storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
 		setPoll(true);
 		super.start();
@@ -213,9 +224,19 @@
 	 * 
 	 * @param sequenceId
 	 */
-	public synchronized void shedulePollingRequest (String internalSequenceId) {
-		if (!sheduledPollingRequests.contains(internalSequenceId))
-			sheduledPollingRequests.add(internalSequenceId);
+	public synchronized void shedulePollingRequest (String sequenceId) {
+		
+		System.out.println("Polling request sheduled for sequence:" + sequenceId);
+		
+		if (sheduledPollingRequests.containsKey (sequenceId)) {
+			Integer sequenceEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
+			Integer newCount = new Integer (sequenceEntryCount.intValue()+1);
+			sheduledPollingRequests.put(sequenceId,newCount);
+		} else {
+			Integer sequenceEntryCount = new Integer (1);
+			sheduledPollingRequests.put(sequenceId, sequenceEntryCount);
+		}
+		
 	}
 
 	

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Sun Nov  5 16:02:50 2006
@@ -1156,6 +1156,18 @@
     String stackTrace = baos.toString();
     return stackTrace;
   }
+  
+	public static PollingManager getPollingManager (ConfigurationContext configurationContext) {
+		PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
+				Sandesha2Constants.POLLING_MANAGER);
+		
+		return pollingManager;
+	}
+	
+	public static void shedulePollingRequest (String sequenceId, ConfigurationContext configurationContext) throws SandeshaException { 
+		PollingManager pollingManager = getPollingManager(configurationContext);
+		pollingManager.shedulePollingRequest(sequenceId);
+	}
 
 	
 }

Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java Sun Nov  5 16:02:50 2006
@@ -113,8 +113,8 @@
 		clientOptions.setTo(new EndpointReference (to));
 		clientOptions.setProperty(Configuration.TRANSPORT_URL,transportTo);
 		
-		String sequenceKey = SandeshaUtil.getUUID();
-		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+//		String sequenceKey = SandeshaUtil.getUUID();
+//		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
 		
 		ServiceClient serviceClient = new ServiceClient (configContext,null);
 		
@@ -133,7 +133,8 @@
 		
 		serviceClient.setOptions(clientOptions);
 		
-		SandeshaClient.createSequence(serviceClient,true);
+		String sequenceKey = SandeshaClient.createSequence(serviceClient,true);
+		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
 		
 		Thread.sleep(10000);
 		

Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java Sun Nov  5 16:02:50 2006
@@ -92,17 +92,21 @@
 		
 		ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
 		ServiceClient serviceClient = new ServiceClient (configContext,null);
-		String sequenceKey = SandeshaUtil.getUUID();
+		
+//		String sequenceKey = SandeshaUtil.getUUID();
 
 		Options clientOptions = new Options ();
 
 		clientOptions.setTo(new EndpointReference (to));
 		clientOptions.setProperty(MessageContextConstants.TRANSPORT_URL,to);
-		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+
+//		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+		
 		clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, spec);
 		serviceClient.setOptions(clientOptions);
 		
-		SandeshaClient.createSequence(serviceClient,false);
+		String sequenceKey = SandeshaClient.createSequence(serviceClient,false);
+		clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
 		
 		SequenceReport sequenceReport = null;
 		for(int i = 0; i < 15; i++) {



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org