You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by pa...@apache.org on 2009/04/03 20:27:57 UTC
svn commit: r761754 - in
/webservices/sandesha/trunk/java/modules/core/src/main:
java/org/apache/sandesha2/ java/org/apache/sandesha2/client/
java/org/apache/sandesha2/i18n/ java/org/apache/sandesha2/msgprocessors/
java/org/apache/sandesha2/storage/bea...
Author: parsonsd
Date: Fri Apr 3 18:27:57 2009
New Revision: 761754
URL: http://svn.apache.org/viewvc?rev=761754&view=rev
Log:
Fix to allow automatic reallocation of sequences that have timed out or been deleted. The solution is to have a reallocated RMSBean point at the RMSBean created as part of the reallocation via a new RMSBean attribute that contains the internalSeqID of the newly created RMSBean.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Fri Apr 3 18:27:57 2009
@@ -245,6 +245,23 @@
String ENDPOINT = "Endpoint";
String UNSUPPORTED_ELEMENT = "UnsupportedElement";
+
+ //This is to identify an RMSBean that hasn't been reallocated
+ int NOT_REALLOCATED = 0;
+
+ //This is to identify an RMSBean that is to be reallocated or has been reallocated
+ int REALLOCATED = 1;
+
+ //This is to identify an RMSBean that was created for reallocation but then was reallocated itself
+ //That way we know it can be deleted
+ int ORIGINAL_REALLOCATED_BEAN_COMPLETE = 2;
+
+ //This is to identify the RMS Bean that was created to reallocate another RMSBean
+ int RMS_BEAN_USED_FOR_REALLOCATION = 3;
+
+ //This is to identify an RMSBean that was attempted to be reallocated but for some reason the reallocation failed.
+ int REALLOCATION_FAILED = -1;
+
}
public interface WSA {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Fri Apr 3 18:27:57 2009
@@ -443,8 +443,19 @@
if (terminatedSequence) {
// Delete the rmsBean
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+
+ if(tran != null && tran.isActive()) tran.commit();
+ tran = storageManager.getTransaction();
+
+ //Need to check if it's an RMSBean created for reallocation. If so we need to
+ //delete the original RMSBean that was reallocated.
+ RMSBean reallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, rmsBean.getInternalSequenceID());
+ if(reallocatedRMSBean != null){
+ if (log.isDebugEnabled())
+ log.debug("Removing Reallocated RMSBean " + reallocatedRMSBean);
+ storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+ }
}
-
if(tran != null && tran.isActive()) tran.commit();
tran = null;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Fri Apr 3 18:27:57 2009
@@ -79,8 +79,8 @@
public static final String propertyInvalidValue="propertyInvalidValue";
public static final String invalidRange="invalidRange";
public static final String workAlreadyAssigned="workAlreadyAssigned";
- public static final String reallocationFailed="reallocationFailed";
-
+ public static final String reallocationFailed="reallocationFailed";
+ public static final String reallocationForSyncRequestReplyNotSupported="reallocationForSyncRequestReplyNotSupported";
public static final String rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
public static final String unknownWSAVersion="unknownWSAVersion";
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Fri Apr 3 18:27:57 2009
@@ -177,8 +177,6 @@
if (msgContext.getMessageID() == null)
msgContext.setMessageID(SandeshaUtil.getUUID());
-
-
/*
* Internal sequence id is the one used to refer to the sequence (since
* actual sequence id is not available when first msg arrives) server
@@ -230,13 +228,47 @@
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+ boolean autoStartNewSeqForReallocation = false;
//if this is an existing sequence then we need to do some checks first
if(rmsBean != null)
{
+ //If the sequence has been reallocated we need to find out the new internalSeqID.
+ //If the internalSeqID hasn't been set yet we should auto restart. If it has a new
+ //internalSeqID we just send the message on the new reallocated sequence.
+ int seqReallocated = rmsBean.isReallocated();
+ if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATED){
+ if (log.isDebugEnabled())
+ log.debug("ApplicationMsgProcessor: Reallocated Sequence: " + rmsBean.getSequenceID());
+ //Try and get the new internalSeqID
+ internalSequenceId = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+ if(internalSequenceId != null){
+ if (log.isDebugEnabled())
+ log.debug("ApplicationMsgProcessor: InternalSeqID of new sequence: " + internalSequenceId);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceId);
+ rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+ } else {
+ autoStartNewSeqForReallocation = true;
+ }
+ } else if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED){
+ //We can't do anymore as we have already tried to reallocate this sequence.
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(),
+ "We have already attempted to reallocate this Sequence and we won't try again. The sequance needs to be cleaned up manually."));
+ }
+
//see if the sequence is closed
- if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut()){
+ if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut() || autoStartNewSeqForReallocation){
if(SandeshaUtil.isAutoStartNewSequence(msgContext)){
internalSequenceId = getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence
+ if(autoStartNewSeqForReallocation){
+ if (log.isDebugEnabled())
+ log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation: InternalSeqID of new sequence used for reallocation: "
+ + internalSequenceId);
+ rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ if(tran != null && tran.isActive()) tran.commit();
+ tran = storageManager.getTransaction();
+ }
if (log.isDebugEnabled())
log.debug("ApplicationMsgProcessor: auto start new sequence " + internalSequenceId + " :: " + rmsBean);
//set this new internal sequence ID on the msg
@@ -337,6 +369,11 @@
if (rmsBean == null) {
rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+
+ if(autoStartNewSeqForReallocation){
+ rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.RMS_BEAN_USED_FOR_REALLOCATION);
+ }
+
if(rmsBean != null) outSequenceID = rmsBean.getSequenceID();
if (rmsBean == null && appMsgProcTran != null && appMsgProcTran.isActive()) {
@@ -348,7 +385,6 @@
appMsgProcTran = storageManager.getTransaction();
}
}
-
}
} else {
@@ -554,6 +590,7 @@
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+
return true;
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri Apr 3 18:27:57 2009
@@ -154,7 +154,7 @@
if(!rmsBeanMgr.update(rmsBean)){
//Im not setting the createSeqBean sender bean to resend true as the reallocation of msgs will do this
try{
- TerminateManager.terminateSendingSide(rmsBean, storageManager, true);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction);
} catch(Exception e){
if (log.isDebugEnabled())
log.debug(e);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Fri Apr 3 18:27:57 2009
@@ -74,7 +74,7 @@
}
}
- TerminateManager.terminateSendingSide (rmsBean, storageManager, false);
+ TerminateManager.terminateSendingSide (rmsBean, storageManager, false, null);
// Stop this message travelling further through the Axis runtime
terminateResRMMsg.pause();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java Fri Apr 3 18:27:57 2009
@@ -19,6 +19,7 @@
package org.apache.sandesha2.storage.beans;
+import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.util.Range;
import org.apache.sandesha2.util.RangeString;
@@ -152,6 +153,22 @@
* be ignored within the match method.
*/
private int rmsFlags = 0;
+
+ /**
+ * Indicates the reallocation state. The states can be either:
+ * notReallocated - The bean hasn't been reallocated
+ * reallocated - The bean is to be reallocated
+ * ReallocatedBeanComplete - The bean was created for reallocation but is no longer needed as itself has been reallocated
+ * BeanUsedForReallocation - The bean was created for reallocation
+ * ReallocationFailed - The reallocation of this bean failed
+ */
+ private int reallocated = Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED;
+
+ /**
+ * Contains the internalSeqID of the seq that has sent the reallocated msgs
+ */
+ private String internalSeqIDOfSeqUsedForReallocation = null;
+
public static final int LAST_SEND_ERROR_TIME_FLAG = 0x00000001;
public static final int LAST_OUT_MSG_FLAG = 0x00000010;
public static final int HIGHEST_OUT_MSG_FLAG = 0x00000100;
@@ -195,7 +212,9 @@
terminationPauserForCS = beanToCopy.isTerminationPauserForCS();
timedOut = beanToCopy.isTimedOut();
transportTo = beanToCopy.getTransportTo();
- avoidAutoTermination = beanToCopy.isAvoidAutoTermination();
+ avoidAutoTermination = beanToCopy.isAvoidAutoTermination();
+ reallocated = beanToCopy.isReallocated();
+ internalSeqIDOfSeqUsedForReallocation = beanToCopy.getInternalSeqIDOfSeqUsedForReallocation();
}
public String getCreateSeqMsgID() {
@@ -434,6 +453,8 @@
result.append("\nClientCompletedMsgs: "); result.append(clientCompletedMessages);
result.append("\nAnonymous UUID : "); result.append(anonymousUUID);
result.append("\nSOAPVersion : "); result.append(soapVersion);
+ result.append("\nReallocated : "); result.append(reallocated);
+ result.append("\nInternalSeqIDOfSeqUsedForReallocation : "); result.append(internalSeqIDOfSeqUsedForReallocation);
return result.toString();
}
@@ -478,6 +499,9 @@
else if(bean.getAnonymousUUID() != null && !bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
match = false;
+ else if((bean.getInternalSeqIDOfSeqUsedForReallocation() != null && !bean.getInternalSeqIDOfSeqUsedForReallocation().equals(this.getInternalSeqIDOfSeqUsedForReallocation())))
+ match = false;
+
// Avoid matching on the error information
// else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 && bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
// match = false;
@@ -511,8 +535,26 @@
else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && bean.getExpectedReplies() != this.getExpectedReplies())
match = false;
+
+
return match;
}
+ public int isReallocated() {
+ return reallocated;
+ }
+
+ public void setReallocated(int reallocated) {
+ this.reallocated = reallocated;
+ }
+
+ public String getInternalSeqIDOfSeqUsedForReallocation() {
+ return internalSeqIDOfSeqUsedForReallocation;
+ }
+
+ public void setInternalSeqIDOfSeqUsedForReallocation(String internalSeqIDOfSeqUsedForReallocation) {
+ this.internalSeqIDOfSeqUsedForReallocation = internalSeqIDOfSeqUsedForReallocation;
+ }
+
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Fri Apr 3 18:27:57 2009
@@ -619,7 +619,7 @@
if (log.isDebugEnabled())
log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
- // Sending the message
+ //Sending the message
//having a surrounded try block will make sure that the error is logged here
//and that this does not disturb the processing of a carrier message.
try {
@@ -671,7 +671,7 @@
}
- private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault {
+ private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::manageIncomingFault");
InvocationResponse response = InvocationResponse.CONTINUE;
@@ -743,7 +743,7 @@
} else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) ||
Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) ||
Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode)) {
- processSequenceUnknownFault(rmMsgCtx, fault, identifier);
+ processSequenceUnknownFault(rmMsgCtx, fault, identifier, transaction);
}
// If the operation is an Sandesha In Only operation, or the fault is a recognised fault,
@@ -783,7 +783,7 @@
// constructing the fault
AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart, rmMsgCtx);
- response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+ response = manageIncomingFault (axisFault, rmMsgCtx, faultPart, transaction);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
@@ -966,7 +966,7 @@
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " + rmsBean);
- TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processCreateSequenceRefusedFault");
@@ -980,7 +980,7 @@
* @param fault
* @param identifier
*/
- private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID) throws AxisFault {
+ private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID);
@@ -998,16 +998,16 @@
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " + rmsBean);
- if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true)){
+ if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction)){
// We did not reallocate so we notify the clients of a failure
notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+
+ //Mark the RMSBean as reallocation failed and update last activation time
+ transaction = storageManager.getTransaction();
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ if(transaction != null && transaction.isActive()) transaction.commit();
}
-
- // Update the last activated time.
- rmsBean.setLastActivatedTime(System.currentTimeMillis());
-
- // Update the bean in the map
- storageManager.getRMSBeanMgr().update(rmsBean);
}
else {
RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Fri Apr 3 18:27:57 2009
@@ -44,6 +44,7 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.client.async.Callback;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -59,6 +60,8 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.Handler;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,6 +77,7 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -1015,10 +1019,22 @@
return targetEnv;
}
-
- public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean oldRMSBean, List<MessageContext> msgsToSend)throws AxisFault{
- if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
- log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence");
+
+
+ /**
+ * ReallocateMessages to a new sequence
+ * @param storageManager
+ * @param oldRMSBean
+ * @param msgsToSend
+ * @param transaction
+ *
+ */
+ public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean oldRMSBean,
+ List<MessageContext> msgsToSend, Transaction transaction)
+ throws AxisFault, SandeshaException{
+
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence");
ConfigurationContext ctx = storageManager.getContext();
ServiceClient client = new ServiceClient(ctx, null);
@@ -1027,30 +1043,68 @@
Options options = client.getOptions();
options.setTo(oldRMSBean.getToEndpointReference());
options.setReplyTo(oldRMSBean.getReplyToEndpointReference());
-
- //internal sequence ID is different
- String internalSequenceID = oldRMSBean.getInternalSequenceID();
- //we also need to obtain the sequenceKey from the internalSequenceID.
- String oldSequenceKey =
- SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, oldRMSBean.getToEndpointReference().getAddress());
- //remove the old sequence key from the internal sequence ID
- internalSequenceID = internalSequenceID.substring(0, internalSequenceID.length()-oldSequenceKey.length());
- options.setProperty(SandeshaClientConstants.SEQUENCE_KEY,
- SandeshaUtil.getUUID()); //using a new sequence Key to differentiate from the old sequence
- options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
- options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
- options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
-
- //send the msgs - this will setup a new sequence to the same endpoint
- Iterator<MessageContext> it = msgsToSend.iterator();
- while(it.hasNext()){
- MessageContext msgCtx = (MessageContext)it.next();
- client.getOptions().setAction(msgCtx.getWSAAction());
- client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
- }
-
- if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
- log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence");
+
+ //internal sequence ID is different
+ String internalSequenceID = oldRMSBean.getInternalSequenceID();
+
+ options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
+ options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
+ options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
+
+ //Update the RMSBean so as to mark it as reallocated if it isn't an RMSbean created for a previous reallocation
+ RMSBean originallyReallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, oldRMSBean.getInternalSequenceID());
+ if(originallyReallocatedRMSBean == null){
+ oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATED);
+ storageManager.getRMSBeanMgr().update(oldRMSBean);
+ } else {
+ options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, originallyReallocatedRMSBean.getInternalSequenceID());
+ originallyReallocatedRMSBean.setInternalSeqIDOfSeqUsedForReallocation(null);
+ storageManager.getRMSBeanMgr().update(originallyReallocatedRMSBean);
+
+ //Setting this property so that the bean can be deleted
+ oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE);
+ oldRMSBean.setInternalSeqIDOfSeqUsedForReallocation(originallyReallocatedRMSBean.getInternalSequenceID());
+ storageManager.getRMSBeanMgr().update(oldRMSBean);
+ }
+
+ //Commit current transaction that wraps the manageFaultMsg as we are about to start
+ //resending msgs on a new seq and they will need to get a transaction on the
+ //current thread
+ if(transaction != null && transaction.isActive()) transaction.commit();
+
+ //send the msgs - this will setup a new sequence to the same endpoint
+ Iterator<MessageContext> it = msgsToSend.iterator();
+
+ while(it.hasNext()){
+ MessageContext msgCtx = (MessageContext)it.next();
+
+ //Set the action
+ client.getOptions().setAction(msgCtx.getWSAAction());
+
+ //Set the message ID
+ client.getOptions().setMessageId(msgCtx.getMessageID());
+
+ //Get the AxisOperation
+ AxisOperation axisOperation = msgCtx.getAxisOperation();
+
+ //If it's oneway or async, reallocate
+ //Fail if replyTo is annonymous as this is currently not supported because in twoway we can't get responses back to th eold something
+ if(axisOperation.getAxisSpecificMEPConstant() == WSDLConstants.MEP_CONSTANT_OUT_ONLY){
+ client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
+ } else if (client.getOptions().getReplyTo().hasAnonymousAddress()){
+ oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+ storageManager.getRMSBeanMgr().update(oldRMSBean);
+ throw new SandeshaException(SandeshaMessageKeys.reallocationForSyncRequestReplyNotSupported);
+ } else {
+ MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
+ Object callback = ((CallbackReceiver)msgReceiver).lookupCallback(msgCtx.getMessageID());
+ client.setAxisService(msgCtx.getAxisService());
+ client.sendReceiveNonBlocking(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement(), (Callback)callback);
+ }
+ }
+
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence");
}
/**
@@ -1276,4 +1330,16 @@
return result;
}
+ public static RMSBean isLinkedToReallocatedRMSBean(StorageManager storageManager, String internalSeqID) throws SandeshaException {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isLinkedToReallocatedRMSBean");
+
+ //Need to check if it's an RMSBean created for reallocation.
+ RMSBean finderBean = new RMSBean();
+ finderBean.setInternalSeqIDOfSeqUsedForReallocation(internalSeqID);
+ RMSBean reallocatedRMSBean = storageManager.getRMSBeanMgr().findUnique(finderBean);
+
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isLinkedToReallocatedRMSBean, ReallocatedRMSBean: " + reallocatedRMSBean);
+ return reallocatedRMSBean;
+ }
+
}
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Fri Apr 3 18:27:57 2009
@@ -25,8 +25,11 @@
import java.util.LinkedList;
import java.util.List;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -40,12 +43,13 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.InvokerBean;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
/**
@@ -231,14 +235,14 @@
* @return true if the reallocation happened sucessfully
*/
public static boolean terminateSendingSide(RMSBean rmsBean,
- StorageManager storageManager, boolean reallocate) throws SandeshaException {
+ StorageManager storageManager, boolean reallocate, Transaction transaction) throws SandeshaException {
// Indicate that the sequence is terminated
rmsBean.setTerminated(true);
rmsBean.setTerminateAdded(true);
storageManager.getRMSBeanMgr().update(rmsBean);
- return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean, reallocate);
+ return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean, reallocate, transaction);
}
public static void timeOutSendingSideSequence(String internalSequenceId,
@@ -249,11 +253,11 @@
rmsBean.setLastActivatedTime(System.currentTimeMillis());
storageManager.getRMSBeanMgr().update(rmsBean);
- cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false);
+ cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false, null);
}
private static boolean cleanSendingSideData(String internalSequenceId, StorageManager storageManager,
- RMSBean rmsBean, boolean reallocateIfPossible) throws SandeshaException {
+ RMSBean rmsBean, boolean reallocateIfPossible, Transaction transaction) throws SandeshaException {
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: TerminateManager::cleanSendingSideData " + internalSequenceId + ", " + reallocateIfPossible);
@@ -274,12 +278,15 @@
if(ranges.length==1){
//the sequence is a single contiguous acked range
lastAckedMsg = ranges[0].upperValue;
- }
- else{
- //cannot reallocate as there are gaps
- reallocateIfPossible=false;
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
- log.debug("cannot reallocate sequence as there are gaps");
+ } else{
+ if(reallocateIfPossible){
+ //cannot reallocate as there are gaps
+ rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ reallocateIfPossible=false;
+ if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("cannot reallocate sequence as there are gaps");
+ }
}
while (iterator.hasNext()) {
@@ -332,14 +339,48 @@
if(reallocateIfPossible){
try{
- SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate);
- reallocatedOK = true;
- }
- catch(Exception e){
- //want that the reallocation failed
+ SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate, transaction);
+ reallocatedOK = true;
+
+ //If the reallocation was successful and the RMSBean being reallocated was originally created for reallocation
+ //the RMSBean can be deleted.
+ transaction = storageManager.getTransaction();
+ if(rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){
+ rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ }
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+ } catch(Exception e){
+
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(), e.toString()));
- }
+
+ //Reallocation Failed
+ //Need to mark any RMSBeans involved as failed so that we don't attempt to send
+ //anymore messages on these seq's. The client will have to manually reallocate and
+ //administer the sequences.
+ transaction = storageManager.getTransaction();
+
+ rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ String intSeqIDOfOriginallyReallocatedSeq = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+ if(intSeqIDOfOriginallyReallocatedSeq != null){
+ RMSBean origRMSBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, intSeqIDOfOriginallyReallocatedSeq);
+ origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+ storageManager.getRMSBeanMgr().update(origRMSBean);
+ }
+
+ if(transaction != null && transaction.isActive()) transaction.commit();
+ transaction = null;
+
+ } finally {
+ if (transaction != null && transaction.isActive()) {
+ transaction.rollback();
+ }
+ }
}
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Fri Apr 3 18:27:57 2009
@@ -427,7 +427,7 @@
private void deleteRMSBeans(List<RMSBean> rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime)
- throws SandeshaStorageException {
+ throws SandeshaStorageException, SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter: Sender::deleteRMSBeans");
@@ -437,12 +437,24 @@
RMSBean rmsBean = (RMSBean) beans.next();
long timeNow = System.currentTimeMillis();
long lastActivated = rmsBean.getLastActivatedTime();
+
// delete sequences that have been timedout or deleted for more than
// the SequenceRemovalTimeoutInterval
-
- if ((lastActivated + deleteTime) < timeNow) {
+ if (((lastActivated + deleteTime) < timeNow) &&
+ (rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED)) {
if (log.isDebugEnabled())
log.debug("Removing RMSBean " + rmsBean);
+
+ //Need to check if it's an RMSBean created for reallocation. If so we need to
+ //delete the original RMSBean that was reallocated.
+ RMSBean reallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, rmsBean.getInternalSequenceID());
+
+ if(reallocatedRMSBean != null){
+ if (log.isDebugEnabled())
+ log.debug("Removing Reallocated RMSBean " + reallocatedRMSBean);
+ storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+ }
+
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
}
@@ -616,7 +628,7 @@
// Mark the sequence as terminated
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(manager, id);
- TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages. Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found. Deleting this message with a sequence ID of : " + id);
// Delete the terminate sender bean.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Fri Apr 3 18:27:57 2009
@@ -418,7 +418,7 @@
String sequenceID = terminateSequence.getIdentifier().getIdentifier();
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
- TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Fri Apr 3 18:27:57 2009
@@ -82,7 +82,8 @@
msgContextNotSet=Sandesha2 Internal Error: ''MessageContext'' is null.
transportOutNotPresent=Sandesha2 Internal Error: original transport sender is not present.
workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker. Will try the next one.
-reallocationFailed=The sequence ''{0}'' could not be reallocated due to the error ''{1}''.
+reallocationFailed=Reallocation of msgs from sequence ''{0}'' failed, ''{1}''.
+reallocationForSyncRequestReplyNotSupported=Reallocation for sync requestReply not supported.
couldNotFindOperation=Could not find operation for message type {0} and spec level {1}.
cannotChooseAcksTo=Could not find an appropriate acksTo for the reply sequence, given inbound sequence {0} and bean info {1}.
cannotChooseSpecLevel=Could not find an appropriate specification level for the reply sequence, given inbound sequence {0} and bean info {1}.
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org