You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2006/12/20 16:20:38 UTC

svn commit: r489111 - in /webservices/sandesha/trunk/java: src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java

Author: gatfora
Date: Wed Dec 20 07:20:37 2006
New Revision: 489111

URL: http://svn.apache.org/viewvc?view=rev&rev=489111
Log:
Add an internalSequenceId to the SequenceAck message

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Wed Dec 20 07:20:37 2006
@@ -280,21 +280,10 @@
 			log.debug("Enter: SandeshaGlobalInHandler::processDroppedMessage");
 
 		if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
-			Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			String sequenceId = null;
-
-			if (sequence != null) {
-				sequenceId = sequence.getIdentifier().getIdentifier();
-			}
-
-			SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropertyBeanMgr();
-			SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,
-					Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES);
-			String receivedMsgStr = receivedMsgsBean.getValue();
 
 			// Even though the duplicate message is dropped, hv to send the ack
 			// if needed.
-			SequenceProcessor.sendAckIfNeeded(rmMsgContext, receivedMsgStr, storageManager);
+			SequenceProcessor.sendAckIfNeeded(rmMsgContext, storageManager);
 
 		}
 		if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Wed Dec 20 07:20:37 2006
@@ -263,7 +263,7 @@
 		}
 
 		// Sending acknowledgements
-		sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
+		sendAckIfNeeded(rmMsgCtx, storageManager);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: SequenceProcessor::processReliableMessage " + msgCtxPaused);
@@ -284,7 +284,7 @@
 		return false;
 	}
 
-	public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager)
+	public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, StorageManager storageManager)
 			throws AxisFault {
 
 		if (log.isDebugEnabled())
@@ -351,6 +351,7 @@
 			ackBean.setMessageID(ackMsgCtx.getMessageID());
 			ackBean.setReSend(false);
 			ackBean.setSequenceID(sequencePropertyKey);
+			ackBean.setInternalSequenceID(sequencePropertyKey);
 			EndpointReference to = ackMsgCtx.getTo();
 			if (to!=null)
 				ackBean.setToAddress(to.getAddress());

Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java?view=diff&rev=489111&r1=489110&r2=489111
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java Wed Dec 20 07:20:37 2006
@@ -10,11 +10,14 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.ConfigurationContextFactory;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.SandeshaTestCase;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SequenceReport;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.RangeString;
@@ -112,7 +115,7 @@
 
 	}
 	
-	public void testForceInvokeWithDiscardGaps () throws AxisFault,InterruptedException  {
+	public void testForceInvokeWithDiscardGaps () throws AxisFault  {
 		
 		String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
 		
@@ -139,11 +142,12 @@
 			clientOptions.setProperty(SandeshaClientConstants.MESSAGE_NUMBER,new Long(3));
 			serviceClient.fireAndForget(getPingOMBlock("ping3"));
 	
-			Thread.sleep(5000);
+			String internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+			waitForMessageToBeAcked(serviceClient, internalSequenceId);
 			
 			StorageManager mgr = SandeshaUtil.getInMemoryStorageManager(configContext);
 			Transaction t = mgr.getTransaction();
-			String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(SandeshaUtil.getInternalSequenceID(to, sequenceKey),
+			String inboundSequenceID = SandeshaUtil.getSequenceIDFromInternalSequenceID(internalSequenceId,
 					mgr);
 			t.commit();
 			
@@ -164,4 +168,51 @@
 
 	}
 	
+  /**
+   * Waits for the maximum of "waittime" for a message to be acked, before returning control to the application.
+   * @throws SandeshaException 
+   */
+  private void waitForMessageToBeAcked(ServiceClient serviceClient, String internalSequenceId) throws SandeshaException
+  {
+    // Get the highest out message number
+    ConfigurationContext context = serviceClient.getServiceContext().getConfigurationContext();
+    StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+    
+    // Get the sequence property bean manager
+    SequencePropertyBeanMgr beanMgr = storageManager.getSequencePropertyBeanMgr();
+    
+    // Get a transaction for the property finding
+    Transaction transaction = storageManager.getTransaction();
+    
+    // Get the highest out message property
+    SequencePropertyBean bean = beanMgr.retrieve(internalSequenceId, Sandesha2Constants.SequenceProperties.HIGHEST_OUT_MSG_NUMBER);
+    
+    transaction.commit();
+    
+    Long highestOutMsgKey = Long.valueOf(bean.getValue());
+    
+    long timeNow = System.currentTimeMillis();
+    long timeToComplete = timeNow + waitTime;
+    boolean complete = false;    
+    
+    while (!complete && timeNow < timeToComplete)
+    {
+      timeNow = System.currentTimeMillis();
+
+      try
+      {                              
+        SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient);
+        
+        if (sequenceReport.getCompletedMessages().contains(highestOutMsgKey))
+          complete = true;
+        else
+          Thread.sleep(tickTime);
+  
+      }
+      catch (Exception e)
+      {
+        // Ignore
+      }
+    }
+  }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org