You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/11/06 01:02:51 UTC
svn commit: r471577 - in /webservices/sandesha/trunk/java:
src/org/apache/sandesha2/client/ src/org/apache/sandesha2/msgprocessors/
src/org/apache/sandesha2/polling/ src/org/apache/sandesha2/util/
test/src/org/apache/sandesha2/ test/src/org/apache/sand...
Author: chamikara
Date: Sun Nov 5 16:02:50 2006
New Revision: 471577
URL: http://svn.apache.org/viewvc?view=rev&rev=471577
Log:
Necessary updates from commit 471576 to the branch.
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Sun Nov 5 16:02:50 2006
@@ -327,11 +327,10 @@
}
// setting a new squenceKey if not already set.
- String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
- if (sequenceKey == null) {
- sequenceKey = SandeshaUtil.getUUID();
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
- }
+ String oldSequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+
+ String newSequenceKey = SandeshaUtil.getUUID();
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, newSequenceKey);
String rmSpecVersion = (String) options.getProperty(SandeshaClientConstants.RM_SPEC_VERSION);
@@ -356,10 +355,11 @@
options.setAction(oldAction);
options.setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_FALSE);
-
+ options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, oldSequenceKey);
+
//the generated sequenceKey will be returned. Client can use this to work with this newly generated sequence.
- return sequenceKey;
+ return newSequenceKey;
}
public static void createSequence(ServiceClient serviceClient, boolean offer, String sequenceKey)
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Sun Nov 5 16:02:50 2006
@@ -39,8 +39,10 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
@@ -179,6 +181,19 @@
long msgNo = nack.getNackNumber();
// TODO - Process Nack
+ }
+
+ //adding a MakeConnection for the response sequence if needed.
+ String offeredSequenceId = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE, storageManager);
+ if (offeredSequenceId!=null) {
+
+ NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+ NextMsgBean nextMsgBean = nextMsgBeanMgr.retrieve(outSequenceId);
+
+ if (nextMsgBean!=null && nextMsgBean.isPollingMode())
+ SandeshaUtil.shedulePollingRequest(offeredSequenceId, configCtx);
+
}
// setting acked message date.
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=auto&rev=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Sun Nov 5 16:02:50 2006
@@ -0,0 +1,52 @@
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.MessagePending;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class MessagePendingProcessor {
+
+ private static final Log log = LogFactory.getLog(MessagePendingProcessor.class);
+
+ public boolean processMessagePendingHeaders (MessageContext message) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: MessagePendingProcessor::processMessagePendingHeaders");
+
+ boolean messagePaused = false;
+
+ RMMsgContext rmMsgContext = MsgInitializer.initializeMessage(message);
+ Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ MessagePending messagePending = (MessagePending) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+
+ if (sequence!=null) {
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+
+ if (messagePending!=null) {
+ boolean pending = messagePending.isPending();
+ if (pending) {
+ PollingManager pollingManager = SandeshaUtil.getPollingManager(message.getConfigurationContext());
+ if (pollingManager!=null) {
+ pollingManager.shedulePollingRequest(sequenceId);
+ }
+ }
+ }
+ }
+
+
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: MessagePendingProcessor::processMessagePendingHeaders");
+
+ return messagePaused;
+ }
+
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Sun Nov 5 16:02:50 2006
@@ -29,7 +29,9 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.TerminateManager;
@@ -72,6 +74,17 @@
ConfigurationContext configContext = msgContext.getConfigurationContext();
+ //shedulling a polling request for the response side.
+ String offeredSequenceId = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE, storageManager);
+
+ if (offeredSequenceId!=null) {
+ NextMsgBeanMgr nextMsgBeanMgr = storageManager.getNextMsgBeanMgr();
+ NextMsgBean nextMsgBean = nextMsgBeanMgr.retrieve(sequenceId);
+
+ if (nextMsgBean!=null && nextMsgBean.isPollingMode())
+ SandeshaUtil.shedulePollingRequest(offeredSequenceId, configContext);
+ }
TerminateManager.terminateSendingSide (configContext, sequencePropertyKey,internalSequenceID, msgContext.isServerSide(),
storageManager);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Sun Nov 5 16:02:50 2006
@@ -18,6 +18,7 @@
package org.apache.sandesha2.polling;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Random;
@@ -55,37 +56,29 @@
/**
* By adding an entry to this, the PollingManager will be asked to do a polling request on this sequence.
*/
- private ArrayList sheduledPollingRequests = null;
+ private HashMap sheduledPollingRequests = null;
- private final int POLLING_MANAGER_WAIT_TIME = 5000;
+ private final int POLLING_MANAGER_WAIT_TIME = 3000;
public void run() {
+
while (isPoll()) {
try {
- try {
- Thread.sleep(POLLING_MANAGER_WAIT_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
//geting the sequences to be polled.
//if shedule contains any requests, do the earliest one.
//else pick one randomly.
- String sequenceId = null;
- if (sheduledPollingRequests.size()>0) {
- sequenceId = (String )sheduledPollingRequests.get(0);
- sheduledPollingRequests.remove(0);
- }
+ String sequenceId = getNextSheduleEntry ();
NextMsgBean nextMsgBean = null;
if (sequenceId==null) {
+
NextMsgBean findBean = new NextMsgBean ();
findBean.setPollingMode(true);
@@ -97,6 +90,8 @@
nextMsgBean = (NextMsgBean) results.get(item);
}
+
+
} else {
NextMsgBean findBean = new NextMsgBean ();
findBean.setPollingMode(true);
@@ -173,6 +168,22 @@
}
}
+ private synchronized String getNextSheduleEntry () {
+ String sequenceId = null;
+
+ if (sheduledPollingRequests.size()>0) {
+ sequenceId = (String) sheduledPollingRequests.keySet().iterator().next();
+ Integer sequencEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
+
+ Integer leftCount = new Integer (sequencEntryCount.intValue() -1 );
+ if (leftCount.intValue()==0)
+ sheduledPollingRequests.remove(sequenceId);
+
+ }
+
+ return sequenceId;
+ }
+
/**
* Starts the PollingManager.
*
@@ -181,7 +192,7 @@
*/
public synchronized void start (ConfigurationContext configurationContext) throws SandeshaException {
this.configurationContext = configurationContext;
- this.sheduledPollingRequests = new ArrayList ();
+ this.sheduledPollingRequests = new HashMap ();
this.storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
setPoll(true);
super.start();
@@ -213,9 +224,19 @@
*
* @param sequenceId
*/
- public synchronized void shedulePollingRequest (String internalSequenceId) {
- if (!sheduledPollingRequests.contains(internalSequenceId))
- sheduledPollingRequests.add(internalSequenceId);
+ public synchronized void shedulePollingRequest (String sequenceId) {
+
+ System.out.println("Polling request sheduled for sequence:" + sequenceId);
+
+ if (sheduledPollingRequests.containsKey (sequenceId)) {
+ Integer sequenceEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
+ Integer newCount = new Integer (sequenceEntryCount.intValue()+1);
+ sheduledPollingRequests.put(sequenceId,newCount);
+ } else {
+ Integer sequenceEntryCount = new Integer (1);
+ sheduledPollingRequests.put(sequenceId, sequenceEntryCount);
+ }
+
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Sun Nov 5 16:02:50 2006
@@ -1156,6 +1156,18 @@
String stackTrace = baos.toString();
return stackTrace;
}
+
+ public static PollingManager getPollingManager (ConfigurationContext configurationContext) {
+ PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
+ Sandesha2Constants.POLLING_MANAGER);
+
+ return pollingManager;
+ }
+
+ public static void shedulePollingRequest (String sequenceId, ConfigurationContext configurationContext) throws SandeshaException {
+ PollingManager pollingManager = getPollingManager(configurationContext);
+ pollingManager.shedulePollingRequest(sequenceId);
+ }
}
Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java Sun Nov 5 16:02:50 2006
@@ -113,8 +113,8 @@
clientOptions.setTo(new EndpointReference (to));
clientOptions.setProperty(Configuration.TRANSPORT_URL,transportTo);
- String sequenceKey = SandeshaUtil.getUUID();
- clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+// String sequenceKey = SandeshaUtil.getUUID();
+// clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
ServiceClient serviceClient = new ServiceClient (configContext,null);
@@ -133,7 +133,8 @@
serviceClient.setOptions(clientOptions);
- SandeshaClient.createSequence(serviceClient,true);
+ String sequenceKey = SandeshaClient.createSequence(serviceClient,true);
+ clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
Thread.sleep(10000);
Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java?view=diff&rev=471577&r1=471576&r2=471577
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/SecurityTest.java Sun Nov 5 16:02:50 2006
@@ -92,17 +92,21 @@
ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
ServiceClient serviceClient = new ServiceClient (configContext,null);
- String sequenceKey = SandeshaUtil.getUUID();
+
+// String sequenceKey = SandeshaUtil.getUUID();
Options clientOptions = new Options ();
clientOptions.setTo(new EndpointReference (to));
clientOptions.setProperty(MessageContextConstants.TRANSPORT_URL,to);
- clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+
+// clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+
clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, spec);
serviceClient.setOptions(clientOptions);
- SandeshaClient.createSequence(serviceClient,false);
+ String sequenceKey = SandeshaClient.createSequence(serviceClient,false);
+ clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
SequenceReport sequenceReport = null;
for(int i = 0; i < 15; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org