You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2006/12/20 16:20:38 UTC
svn commit: r489111 - in /webservices/sandesha/trunk/java:
src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
Author: gatfora
Date: Wed Dec 20 07:20:37 2006
New Revision: 489111
URL: http://svn.apache.org/viewvc?view=rev&rev=489111
Log:
Add an internalSequenceId to the SequenceAck message
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Wed Dec 20 07:20:37 2006
@@ -280,21 +280,10 @@
log.debug("Enter: SandeshaGlobalInHandler::processDroppedMessage");
if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
- Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- String sequenceId = null;
-
- if (sequence != null) {
- sequenceId = sequence.getIdentifier().getIdentifier();
- }
-
- SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
- SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,
- Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
- String receivedMsgStr = receivedMsgsBean.getValue();
// Even though the duplicate message is dropped, hv to send the ack
// if needed.
- SequenceProcessor.sendAckIfNeeded(rmMsgContext, receivedMsgStr, storageManager);
+ SequenceProcessor.sendAckIfNeeded(rmMsgContext, storageManager);
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Wed Dec 20 07:20:37 2006
@@ -263,7 +263,7 @@
}
// Sending acknowledgements
- sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
+ sendAckIfNeeded(rmMsgCtx, storageManager);
if (log.isDebugEnabled())
log.debug("Exit: SequenceProcessor::processReliableMessage " + msgCtxPaused);
@@ -284,7 +284,7 @@
return false;
}
- public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
+ public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, StorageManager storageManager)
throws AxisFault {
if (log.isDebugEnabled())
@@ -351,6 +351,7 @@
ackBean.setMessageID(ackMsgCtx.getMessageID());
ackBean.setReSend(false);
ackBean.setSequenceID(sequencePropertyKey);
+ ackBean.setInternalSequenceID(sequencePropertyKey);
EndpointReference to = ackMsgCtx.getTo();
if (to!=null)
ackBean.setToAddress(to.getAddress());
Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java Wed Dec 20 07:20:37 2006
@@ -10,11 +10,14 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.SandeshaTestCase;
import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SequenceReport;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.RangeString;
@@ -112,7 +115,7 @@
}
- public void testForceInvokeWithDiscardGaps () throws AxisFault,InterruptedException {
+ public void testForceInvokeWithDiscardGaps () throws AxisFault {
String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
@@ -139,11 +142,12 @@
clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
serviceClient.fireAndForget(getPingOMBlock("ping3"));
- Thread.sleep(5000);
+ String internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+ waitForMessageToBeAcked(serviceClient, internalSequenceId);
StorageManager mgr = SandeshaUtil.getInMemoryStorageManager(configContext);
Transaction t = mgr.getTransaction();
- String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+ String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceId,
mgr);
t.commit();
@@ -164,4 +168,51 @@
}
+ /**
+ * Waits for the maximum of "waittime" for a message to be acked, before returning control to the application.
+ * @throws SandeshaException
+ */
+ private void waitForMessageToBeAcked(ServiceClient serviceClient, String internalSequenceId) throws SandeshaException
+ {
+ // Get the highest out message number
+ ConfigurationContext context = serviceClient.getServiceContext().getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+ // Get the sequence property bean manager
+ SequencePropertyBeanMgr beanMgr = storageManager.getSequencePropertyBeanMgr();
+
+ // Get a transaction for the property finding
+ Transaction transaction = storageManager.getTransaction();
+
+ // Get the highest out message property
+ SequencePropertyBean bean = beanMgr.retrieve(internalSequenceId, Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_NUMBER);
+
+ transaction.commit();
+
+ Long highestOutMsgKey = Long.valueOf(bean.getValue());
+
+ long timeNow = System.currentTimeMillis();
+ long timeToComplete = timeNow + waitTime;
+ boolean complete = false;
+
+ while (!complete && timeNow < timeToComplete)
+ {
+ timeNow = System.currentTimeMillis();
+
+ try
+ {
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient);
+
+ if (sequenceReport.getCompletedMessages().contains(highestOutMsgKey))
+ complete = true;
+ else
+ Thread.sleep(tickTime);
+
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org