You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC

svn commit: r531400 [7/18] - in /webservices/sandesha/trunk/java/modules: client/ core/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/sandesha2/ core/src/main/java/org/...

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,165 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.storage.inmemory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beans.RMBean;
+import org.apache.sandesha2.workers.SandeshaThread;
+
+/**
+ * This class does not really implement transactions, but it is a good
+ * place to implement locking for the in memory storage manager.
+ */
+
+public class InMemoryTransaction implements Transaction {
+
+	private static final Log log = LogFactory.getLog(InMemoryTransaction.class);
+
+	private InMemoryStorageManager manager;
+	private String threadName;
+	private int    threadId;
+	private ArrayList enlistedBeans = new ArrayList();
+	private InMemoryTransaction waitingForTran = null;
+	private boolean sentMessages = false;
+	private boolean receivedMessages = false;
+	
+	InMemoryTransaction(InMemoryStorageManager manager, String threadName, int id) {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::<init>");
+		this.manager = manager;
+		this.threadName = threadName;
+		this.threadId = id;
+		if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::<init>, " + this);
+	}
+	
+	public void commit() {
+		releaseLocks();
+		if(sentMessages) manager.getSender().wakeThread();
+		if(receivedMessages) {
+			SandeshaThread invoker = manager.getInvoker();
+			if(invoker != null) invoker.wakeThread();
+		}
+	}
+
+	public void rollback() {
+		releaseLocks();
+	}
+	
+	public boolean isActive () {
+		return !enlistedBeans.isEmpty();
+	}
+
+	public void enlist(RMBean bean) throws SandeshaStorageException {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::enlist, " + bean);
+		if(bean != null) {
+			synchronized (bean) {
+				InMemoryTransaction other = (InMemoryTransaction) bean.getTransaction();
+				while(other != null && other != this) {
+					// Put ourselves into the list of waiters
+					waitingForTran = other;
+
+					// Look to see if there is a loop in the chain of waiters
+					if(!enlistedBeans.isEmpty()) {
+						HashSet set = new HashSet();
+						set.add(this);
+						while(other != null) {
+							if(set.contains(other)) {
+								String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
+								SandeshaStorageException e = new SandeshaStorageException(message);
+								
+								// Do our best to get out of the way of the other work in the system
+								waitingForTran = null;
+								releaseLocks();
+								
+								if(log.isDebugEnabled()) log.debug(message, e);
+								throw e;
+							}
+							set.add(other);
+							other = other.waitingForTran;
+						}
+					}
+
+					try {
+						if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + waitingForTran);
+						bean.wait();
+					} catch(InterruptedException e) {
+						// Do nothing
+					}
+					other = (InMemoryTransaction) bean.getTransaction();
+				}
+				
+				waitingForTran = null;
+				if(other == null) {
+					if(log.isDebugEnabled()) log.debug(this + " locking bean");
+					bean.setTransaction(this);
+					enlistedBeans.add(bean);
+				}
+			}
+		}
+		
+		if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::enlist");
+	}
+	
+	private void releaseLocks() {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::releaseLocks, " + this);
+		manager.removeTransaction(this);
+
+		Iterator beans = enlistedBeans.iterator();
+		while(beans.hasNext()) {
+			RMBean bean = (RMBean) beans.next();
+			synchronized (bean) {
+				bean.setTransaction(null);
+				bean.notify();
+			}
+		}
+		enlistedBeans.clear();
+		
+		if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::releaseLocks");
+	}
+	
+	public String toString() {
+		StringBuffer result = new StringBuffer();
+		result.append("[InMemoryTransaction, id:");
+		result.append(threadId);
+		result.append(", name: ");
+		result.append(threadName);
+		result.append(", locks: ");
+		result.append(enlistedBeans.size());
+		result.append("]");
+		return result.toString();
+	}
+
+	public void setReceivedMessages(boolean receivedMessages) {
+		this.receivedMessages = receivedMessages;
+	}
+
+	public void setSentMessages(boolean sentMessages) {
+		this.sentMessages = sentMessages;
+	}
+}
+
+
+

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.transport;
+
+import org.apache.axis2.description.TransportOutDescription;
+
+public class Sandesha2TransportOutDesc extends TransportOutDescription {
+
+    public Sandesha2TransportOutDesc() {
+        super ("Sandesha2TransportOutDesc");
+        this.setSender(new Sandesha2TransportSender ());
+    }
+	
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.transport;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.HandlerDescription;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+public class Sandesha2TransportSender implements TransportSender  {
+
+	private static final Log log = LogFactory.getLog(Sandesha2TransportSender.class);
+
+	public void cleanup(MessageContext msgContext) throws AxisFault {
+
+	}
+
+	public void stop() {
+
+	}
+
+	public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
+		
+		if (log.isDebugEnabled())
+			log.debug("Enter: Sandesha2TransportSender::invoke, " + msgContext.getEnvelope().getHeader());
+		
+		//setting the correct transport sender.
+		TransportOutDescription transportOut = (TransportOutDescription) msgContext.getProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC);
+		
+		if (transportOut==null)
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.transportOutNotPresent));
+
+		msgContext.setTransportOut(transportOut);
+		
+		String key =  (String) msgContext.getProperty(Sandesha2Constants.MESSAGE_STORE_KEY);
+		
+		if (key==null)
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.cannotGetStorageKey));
+		
+		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+		AxisConfiguration axisConfiguration = configurationContext.getAxisConfiguration();
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,axisConfiguration);
+		
+		msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
+		
+		storageManager.storeMessageContext(key,msgContext);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: Sandesha2TransportSender::invoke");
+		return InvocationResponse.CONTINUE;
+	}
+
+	//Below methods are not used
+	public void cleanUp(MessageContext msgContext) throws AxisFault {}
+
+	public void init(ConfigurationContext confContext, TransportOutDescription transportOut) throws AxisFault {}
+
+	public void cleanup() {}
+
+	public HandlerDescription getHandlerDesc() {return null;}
+
+	public String getName() { return null;}
+
+	public Parameter getParameter(String name) {  return null; }
+
+	public void init(HandlerDescription handlerdesc) {}
+
+	public void flowComplete(MessageContext msgContext) {}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,324 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * Contains logic for managing acknowledgements.
+ */
+
+public class AcknowledgementManager {
+
+	private static Log log = LogFactory.getLog(AcknowledgementManager.class);
+
+	/**
+	 * Piggybacks any available acks of the same sequence to the given
+	 * application message.
+	 * 
+	 * @param applicationRMMsgContext
+	 * @throws SandeshaException
+	 */
+	public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
+			throws SandeshaException {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
+		
+		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+		// If this message is going to an anonymous address, and the inbound sequence has
+		// anonymous acksTo, then we add in an ack for the inbound sequence.
+		EndpointReference target = rmMessageContext.getTo();
+		if(target == null || target.hasAnonymousAddress()) {
+			if(target != null && SandeshaUtil.isWSRMAnonymous(target.getAddress())) {
+				// Search for any sequences that have an acksTo that matches this target, and add an ack
+				// for each of them.
+				RMDBean findBean = new RMDBean();
+				findBean.setAcksToEPR(target.getAddress());
+				findBean.setTerminated(false);
+				Collection rmdBeans = storageManager.getRMDBeanMgr().find(findBean);
+				Iterator sequences = rmdBeans.iterator();
+				while(sequences.hasNext()) {
+					RMDBean sequence = (RMDBean) sequences.next();
+					if (sequence.getHighestInMessageNumber() > 0) {
+						if(log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequence.getSequenceID());
+						RMMsgCreator.addAckMessage(rmMessageContext, sequence.getSequenceID(), sequence);
+					}
+				}
+				
+			} else {
+				// We have no good indicator of the identity of the destination, so the only sequence
+				// we can ack is the inbound one that caused us to create this response.
+				String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+				if(inboundSequence != null) {
+					RMDBean inboundBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
+					if(inboundBean != null && !inboundBean.isTerminated()) {
+						String acksTo = inboundBean.getAcksToEPR();
+						EndpointReference acksToEPR = new EndpointReference(acksTo);
+						
+						if(acksTo == null || acksToEPR.hasAnonymousAddress()) {
+							if(log.isDebugEnabled()) log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
+							RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean);
+						}
+					}
+				}
+			}
+			if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
+			return;
+		}
+		
+		// From here on, we must be dealing with a real address. Piggyback all sequences that have an
+		// acksTo that matches the To address, and that have an ackMessage queued up for sending.
+		SenderBean findBean = new SenderBean();
+		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+		findBean.setSend(true);
+		findBean.setToAddress(target.getAddress());
+
+		Collection collection = retransmitterBeanMgr.find(findBean);
+		Iterator it = collection.iterator();
+		while (it.hasNext()) {
+			SenderBean ackBean = (SenderBean) it.next();
+
+			// Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
+			long timeNow = System.currentTimeMillis();
+			if (ackBean.getTimeToSend() > timeNow) {
+				// Delete the beans that would have sent the ack
+				retransmitterBeanMgr.delete(ackBean.getMessageID());
+				storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
+
+				String sequenceId = ackBean.getSequenceID();
+				if (log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
+
+				RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+				if(rmdBean != null && !rmdBean.isTerminated()) {
+					RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean);
+				}
+			}
+		}
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
+	}
+
+	/**
+	 * 
+	 * @param referenceRMMessage
+	 * @param sequencePropertyKey
+	 * @param sequenceId
+	 * @param storageManager
+	 * @param makeResponse Some work will be done to make the new ack message the response of the reference message.
+	 * @return
+	 * @throws AxisFault
+	 */
+	public static RMMsgContext generateAckMessage(
+			
+			RMMsgContext referenceRMMessage,
+			RMDBean rmdBean,
+			String sequenceId,
+			StorageManager storageManager, 
+			boolean serverSide
+			
+			) throws AxisFault {
+		
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::generateAckMessage " + rmdBean);
+
+		MessageContext referenceMsg = referenceRMMessage.getMessageContext();
+
+		EndpointReference acksTo = new EndpointReference(rmdBean.getAcksToEPR());
+
+		if (acksTo.getAddress() == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
+
+		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.ACK,
+				referenceRMMessage.getRMSpecVersion(),
+				referenceMsg.getAxisService());
+
+		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
+		
+		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+		ackRMMsgCtx.setFlow(MessageContext.OUT_FLOW);
+		ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
+
+		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+				.getSOAPVersion(referenceMsg.getEnvelope()));
+
+		// Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+
+		ackMsgCtx.setEnvelope(envelope);
+
+		ackMsgCtx.setTo(acksTo);
+		
+		ackMsgCtx.setServerSide(serverSide);
+
+		// adding the SequenceAcknowledgement part.
+		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::generateAckMessage");
+		return ackRMMsgCtx;
+	}
+
+	
+	
+
+	public static boolean verifySequenceCompletion(RangeString ackRanges, long lastMessageNo) {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
+
+		boolean result = false;
+		Range complete = new Range(1, lastMessageNo);
+		if(ackRanges.isRangeCompleted(complete)) {
+			result = true;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::verifySequenceCompletion " + result);
+		return result;
+	}
+	
+	public static void addAckBeanEntry (
+			RMMsgContext ackRMMsgContext,
+			String sequenceId, 
+			long timeToSend,
+			StorageManager storageManager) throws AxisFault {
+		if(log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::addAckBeanEntry");
+
+		// Write the acks into the envelope
+		ackRMMsgContext.addSOAPEnvelope();
+		
+		MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
+
+		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+		String key = SandeshaUtil.getUUID();
+
+		SenderBean ackBean = new SenderBean();
+		ackBean.setMessageContextRefKey(key);
+		ackBean.setMessageID(ackMsgContext.getMessageID());
+		ackBean.setReSend(false);
+		ackBean.setSequenceID(sequenceId);
+		EndpointReference to = ackMsgContext.getTo();
+		if (to!=null)
+			ackBean.setToAddress(to.getAddress());
+
+		ackBean.setSend(true);
+		ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+		ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+		// removing old acks.
+		SenderBean findBean = new SenderBean();
+		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+		findBean.setSend(true);
+		findBean.setReSend(false);
+		findBean.setSequenceID(sequenceId);
+		Collection coll = retransmitterBeanMgr.find(findBean);
+		Iterator it = coll.iterator();
+
+		while(it.hasNext()) {
+			SenderBean oldAckBean = (SenderBean) it.next();
+			if(oldAckBean.getTimeToSend() < timeToSend)
+				timeToSend = oldAckBean.getTimeToSend();
+
+			// removing the retransmitted entry for the oldAck
+			retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+
+			// removing the message store entry for the old ack
+			storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
+		}
+
+		ackBean.setTimeToSend(timeToSend);
+
+		ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+		
+		// passing the message through sandesha2sender
+		ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+		
+		SandeshaUtil.executeAndStore(ackRMMsgContext, key);
+
+		// inserting the new ack.
+		retransmitterBeanMgr.insert(ackBean);
+
+		if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::addAckBeanEntry");
+	}
+	
+	public static void sendAckNow (RMMsgContext ackRMMsgContext) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementManager::sendAckNow");
+
+		// Write the acks into the envelope
+		ackRMMsgContext.addSOAPEnvelope();
+		
+		MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
+		ConfigurationContext configContext = ackMsgContext.getConfigurationContext();
+		
+		// setting CONTEXT_WRITTEN since acksto is anonymous
+		if (ackRMMsgContext.getMessageContext().getOperationContext() == null) {
+			// operation context will be null when doing in a GLOBAL
+			// handler.
+			AxisOperation op = ackMsgContext.getAxisOperation();
+
+			OperationContext opCtx = ContextFactory.createOperationContext(op, ackRMMsgContext.getMessageContext().getServiceContext());
+			ackRMMsgContext.getMessageContext().setOperationContext(opCtx);
+		}
+
+		ackRMMsgContext.getMessageContext().getOperationContext().setProperty(
+				org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+
+		ackRMMsgContext.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
+
+		ackRMMsgContext.getMessageContext().setServerSide(true);
+		
+		AxisEngine engine = new AxisEngine(configContext);
+		engine.send(ackMsgContext);
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementManager::sendAckNow");		
+	}	
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,818 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axiom.soap.SOAPFaultCode;
+import org.apache.axiom.soap.SOAPFaultDetail;
+import org.apache.axiom.soap.SOAPFaultReason;
+import org.apache.axiom.soap.SOAPFaultSubCode;
+import org.apache.axiom.soap.SOAPFaultText;
+import org.apache.axiom.soap.SOAPFaultValue;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.util.CallbackReceiver;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.FaultData;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.SequenceFault;
+
+/**
+ * Has logic to check for possible RM related faults and create it.
+ */
+
+public class FaultManager {
+
+	private static final Log log = LogFactory.getLog(FaultManager.class);
+
+	/**
+	 * Check weather the LastMessage number has been exceeded and generate the
+	 * fault if it is.
+	 * 
+	 * @param msgCtx
+	 * @return
+	 */
+	public static void checkForLastMsgNumberExceeded(RMMsgContext applicationRMMessage, StorageManager storageManager)
+			throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::checkForLastMsgNumberExceeded");
+/*	
+ * TODO - This code currently doesn't actually work	
+		Sequence sequence = (Sequence) applicationRMMessage.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+		long messageNumber = sequence.getMessageNumber().getMessageNumber();
+		String sequenceID = sequence.getIdentifier().getIdentifier();
+
+		boolean lastMessageNumberExceeded = false;
+		String reason = null;
+		
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+		if (rmsBean != null) {
+			long lastMessageNo = rmsBean.getLastOutMessage();
+			if (messageNumber > lastMessageNo) {
+				lastMessageNumberExceeded = true;
+				reason = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberLargerThanLastMsg, Long
+						.toString(messageNumber), Long.toString(lastMessageNo));
+			}
+		}
+
+		if (lastMessageNumberExceeded) {
+			FaultData faultData = new FaultData();
+			faultData.setType(Sandesha2Constants.SOAPFaults.FaultType.LAST_MESSAGE_NO_EXCEEDED);
+			int SOAPVersion = SandeshaUtil.getSOAPVersion(applicationRMMessage.getSOAPEnvelope());
+			if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+				faultData.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+			else
+				faultData.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+			faultData.setSubcode(Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED);
+			faultData.setReason(reason);
+			
+			SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+			String rmNamespace = applicationRMMessage.getRMNamespaceValue();
+			OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+					rmNamespace, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+			identifierElement.setText(sequenceID);
+			
+			faultData.setDetail(identifierElement);
+			
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded, lastMessageNumberExceeded");
+			getFault(applicationRMMessage, faultData, storageManager);
+		}
+*/
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded");
+	}
+
+	public static RMMsgContext checkForMessageNumberRoleover(MessageContext messageContext) {
+		return null;
+	}
+
+	/**
+	 * Check whether a Sequence message (a) belongs to a unknown sequence
+	 * (generates an UnknownSequence fault) (b) message number exceeds a
+	 * predifined limit ( genenrates a Message Number Rollover fault)
+	 * 
+	 * @param msgCtx
+	 * @return true if no exception has been thrown and the sequence doesn't exist 
+	 * @throws SandeshaException
+	 */
+	public static boolean checkForUnknownSequence(RMMsgContext rmMessageContext, String sequenceID,
+			StorageManager storageManager) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::checkForUnknownSequence, " + sequenceID);
+
+		boolean validSequence = false;
+
+		// Look for an outbound sequence
+		if (SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID) != null) {
+			validSequence = true;
+			// Look for an inbound sequence
+		} else if(SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID) != null) { 
+				validSequence = true;
+		}
+
+		if (!validSequence) {
+
+			if (log.isDebugEnabled())
+				log.debug("Sequence not valid " + sequenceID);
+
+			// Return an UnknownSequence error
+			MessageContext messageContext = rmMessageContext.getMessageContext();
+
+			int SOAPVersion = SandeshaUtil.getSOAPVersion(messageContext.getEnvelope());
+
+			FaultData data = new FaultData();
+			if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+				data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+			else
+				data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+			data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(), 
+					Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE ));
+
+			SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+
+			OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+					rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+			identifierElement.setText(sequenceID);
+			
+			data.setDetail(identifierElement);
+
+			data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSequenceFault, sequenceID));
+			
+			data.setType(Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE);
+
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::checkForUnknownSequence, Sequence unknown");
+			getOrSendFault(rmMessageContext, data);
+			return true;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::checkForUnknownSequence");
+		return false;
+	}
+
+	/**
+	 * Check weather the Acknowledgement is invalid and generate a fault if it
+	 * is.
+	 * 
+	 * @param msgCtx
+	 * @return
+	 * @throws SandeshaException
+	 */
+	public static boolean checkForInvalidAcknowledgement(RMMsgContext ackRMMessageContext, SequenceAcknowledgement sequenceAcknowledgement,
+			StorageManager storageManager, RMSBean rmsBean)
+			throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::checkForInvalidAcknowledgement");
+
+		// check lower<=upper
+		if (ackRMMessageContext.getMessageType() != Sandesha2Constants.MessageTypes.ACK) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::checkForInvalidAcknowledgement, MessageType not an ACK");
+		}
+
+		boolean invalidAck = false;
+		
+		List sequenceAckList = sequenceAcknowledgement.getAcknowledgementRanges();
+		Iterator it = sequenceAckList.iterator();
+
+		while (it.hasNext()) {
+			AcknowledgementRange acknowledgementRange = (AcknowledgementRange) it.next();
+			long upper = acknowledgementRange.getUpperValue();
+			long lower = acknowledgementRange.getLowerValue();
+
+			if (lower > upper) {
+				invalidAck = true;					
+				// check upper isn't bigger than the highest out msg number
+			} else if ( upper > rmsBean.getHighestOutMessageNumber() ) {
+				invalidAck = true;
+			}
+				
+			if (invalidAck) {
+				makeInvalidAcknowledgementFault(ackRMMessageContext, sequenceAcknowledgement, 
+						acknowledgementRange, storageManager);
+				return true;
+			}
+		}		
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::checkForInvalidAcknowledgement");
+		return false;
+	}
+
+	/**
+	 * Makes an InvalidAcknowledgement fault.
+	 * @param rmMsgCtx
+	 * @param storageManager
+	 * @param message
+	 * @throws AxisFault 
+	 */
+	public static void makeInvalidAcknowledgementFault(RMMsgContext rmMsgCtx, 
+			SequenceAcknowledgement sequenceAcknowledgement, AcknowledgementRange acknowledgementRange,
+			StorageManager storageManager) throws AxisFault {
+		FaultData data = new FaultData();
+		int SOAPVersion = SandeshaUtil.getSOAPVersion(rmMsgCtx.getMessageContext().getEnvelope());
+		if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+			data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+		else
+			data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+		data.setType(Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT);
+		data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMsgCtx.getRMNamespaceValue(), 
+				Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT ));
+		data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckFault));
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+
+		OMElement seqAckElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK,
+				rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+	
+		// Set the sequence Id
+		sequenceAcknowledgement.getIdentifier().toOMElement(seqAckElement);
+
+		// Set the Ack Range
+		acknowledgementRange.toOMElement(seqAckElement);
+		
+		data.setDetail(seqAckElement);
+							
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::checkForInvalidAcknowledgement, invalid ACK");
+		getOrSendFault(rmMsgCtx, data);
+  }
+
+	/**
+	 * Makes a Create sequence refused fault
+	 */
+	public static void makeCreateSequenceRefusedFault(RMMsgContext rmMessageContext, 
+																										String detail,
+																										Exception e) 
+	
+	throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::makeCreateSequenceRefusedFault, " + detail);
+		
+		// Return a CreateSequenceRefused error
+		MessageContext messageContext = rmMessageContext.getMessageContext();
+
+		int SOAPVersion = SandeshaUtil.getSOAPVersion(messageContext.getEnvelope());
+
+		FaultData data = new FaultData();
+		if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+			data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+		else
+			data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+		data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(), 
+				Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED ));
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+		OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+				rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+		identifierElement.setText(detail);
+		data.setDetail(identifierElement);
+		data.setDetailString(detail);
+
+		data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
+		
+		data.setType(Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED);
+		
+		data.setExceptionString(SandeshaUtil.getStackTraceFromException(e));
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::makeCreateSequenceRefusedFault");
+		getOrSendFault(rmMessageContext, data);
+	}
+	
+	/**
+	 * Checks if a sequence is terminated and returns a SequenceTerminated fault.
+	 * @param referenceRMMessage
+	 * @param sequenceID
+	 * @param rmdBean
+	 * @return
+	 * @throws AxisFault 
+	 */
+	public static boolean checkForSequenceTerminated(RMMsgContext referenceRMMessage, String sequenceID, RMSequenceBean bean) 
+	
+	throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
+
+		if (bean.isTerminated()) {
+			MessageContext referenceMessage = referenceRMMessage.getMessageContext();
+			FaultData data = new FaultData();
+			int SOAPVersion = SandeshaUtil.getSOAPVersion(referenceMessage.getEnvelope());
+			if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+				data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+			else
+				data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+			data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(), 
+					Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED ));
+			data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceTerminatedFault, sequenceID));
+			data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED);
+			
+			SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+			String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
+			OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+					rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+			identifierElement.setText(sequenceID);
+			
+			data.setDetail(identifierElement);
+
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::checkForSequenceClosed, sequence closed");
+			getOrSendFault(referenceRMMessage, data);
+			return true;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::checkForSequenceClosed");
+		return false;
+  }
+
+	public static boolean checkForSequenceClosed(RMMsgContext referenceRMMessage, String sequenceID,
+			RMDBean rmdBean) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
+
+		if (rmdBean.isClosed()) {
+			MessageContext referenceMessage = referenceRMMessage.getMessageContext();
+			FaultData data = new FaultData();
+			int SOAPVersion = SandeshaUtil.getSOAPVersion(referenceMessage.getEnvelope());
+			if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+				data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+			else
+				data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+			data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(), 
+					Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED ));
+			data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotAcceptMsgAsSequenceClosedFault));
+			data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED);
+			
+			SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+			String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
+			OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+					rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+			identifierElement.setText(sequenceID);
+			
+			data.setDetail(identifierElement);
+
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::checkForSequenceClosed, sequence closed");
+			getOrSendFault(referenceRMMessage, data);
+			return true;
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::checkForSequenceClosed");
+		return false;
+	}
+	
+	/**
+	 * Adds the necessary Fault elements as properties to the message context.
+	 * Or if this is a SOAP11 Fault, generates the correct RM Fault and sends.
+	 * 
+	 * @param referenceRMMsgContext - Message in reference to which the fault will be generated.
+	 * @param data - data for the fault
+	 * @return - The dummy fault to be thrown out.
+	 * 
+	 * @throws AxisFault
+	 */
+	public static void getOrSendFault(RMMsgContext referenceRMMsgContext, FaultData data) throws AxisFault {
+		SOAPFactory factory = (SOAPFactory) referenceRMMsgContext.getSOAPEnvelope().getOMFactory();
+		
+		SOAPFaultCode faultCode = factory.createSOAPFaultCode();
+		SOAPFaultSubCode faultSubCode = factory.createSOAPFaultSubCode(faultCode);
+		
+		SOAPFaultValue faultColdValue = factory.createSOAPFaultValue(faultCode);
+		SOAPFaultValue faultSubcodeValue = factory.createSOAPFaultValue(faultSubCode);
+		
+		faultColdValue.setText(data.getCode());
+		
+		faultSubcodeValue.setText(data.getSubcode());
+
+		faultCode.setSubCode(faultSubCode);
+		
+		SOAPFaultReason reason = factory.createSOAPFaultReason();
+		SOAPFaultText reasonText = factory.createSOAPFaultText();
+		reasonText.setText(data.getReason());
+		
+		SOAPFaultDetail detail = factory.createSOAPFaultDetail();
+		detail.addDetailEntry(data.getDetail());
+		
+		String SOAPNamespaceValue = factory.getSoapVersionURI();
+		
+		if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
+			reason.addSOAPText(reasonText);
+			referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
+			referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason);
+			referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
+		} else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals (SOAPNamespaceValue)) {
+			reason.setText(data.getReason());
+			referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
+			referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
+			referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason);
+			// Need to send this message as the Axis Layer doesn't set the "SequenceFault" header
+			MessageContext faultMessageContext = 
+				MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(), null);
+
+			SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext, Sandesha2Constants.SOAPVersion.v1_1, data, referenceRMMsgContext.getRMNamespaceValue());			
+			
+			referenceRMMsgContext.getMessageContext().getOperationContext().setProperty(
+					org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+						
+			// Set the action
+			faultMessageContext.setWSAAction(
+					SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
+			
+			if (log.isDebugEnabled())
+				log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
+			// Send the message
+			AxisEngine engine = new AxisEngine(faultMessageContext.getConfigurationContext());
+			engine.sendFault(faultMessageContext);
+			
+			return;
+		} else {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSoapVersion);
+			throw new SandeshaException (message);
+		}
+		AxisFault fault = new AxisFault(faultColdValue.getTextAsQName(), data.getReason(), "", "", data.getDetail());
+	  fault.setFaultAction(SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
+		throw fault;		
+		
+	}
+
+	public static boolean isRMFault (String faultSubcodeValue) {
+		if (faultSubcodeValue==null)
+			return false;
+		
+		if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equalsIgnoreCase (faultSubcodeValue) ||
+			Sandesha2Constants.SOAPFaults.Subcodes.INVALID_ACKNOWLEDGEMENT.equalsIgnoreCase (faultSubcodeValue) ||	
+			Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED.equalsIgnoreCase (faultSubcodeValue) ||
+			Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equalsIgnoreCase (faultSubcodeValue) ||
+			Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_CLOSED.equalsIgnoreCase (faultSubcodeValue) ||
+			Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equalsIgnoreCase (faultSubcodeValue) ||
+			Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equalsIgnoreCase (faultSubcodeValue) ) {
+		
+			return true;
+		}
+		
+		return false;
+		
+	}
+	
+	private static void manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault {
+	
+		if (log.isErrorEnabled())
+			log.error(fault);
+		
+		SandeshaListener listner = (SandeshaListener) rmMsgCtx.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+		if (listner!=null)
+			listner.onError(fault);
+		
+		// Get the SOAPVersion
+		SOAPFactory factory = (SOAPFactory) rmMsgCtx.getSOAPEnvelope().getOMFactory();		
+		String SOAPNamespaceValue = factory.getSoapVersionURI();
+		
+		String soapFaultSubcode = null;
+		String identifier = null;
+		if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
+			// Log the fault
+			if (faultPart.getCode() != null && 
+					faultPart.getCode().getSubCode() != null &&
+					faultPart.getCode().getSubCode().getValue() != null)
+				
+				soapFaultSubcode = faultPart.getCode().getSubCode().getValue().getTextAsQName().getLocalPart();
+			
+			// Get the identifier, if there is one.
+			SOAPFaultDetail detail = faultPart.getDetail();
+			if (detail != null)
+			{
+				OMElement identifierOM = detail.getFirstChildWithName(new QName(rmMsgCtx.getRMNamespaceValue(), 
+					Sandesha2Constants.WSRM_COMMON.IDENTIFIER));
+			  if (identifierOM != null)
+			  	identifier = identifierOM.getText();
+			}
+			
+		} else {
+			// Need to get the sequence part from the Header.
+			try {
+	      SequenceFault sequenceFault = (SequenceFault)rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE_FAULT);
+	      
+	      // If the sequence fault part is not null, then we have an RM specific fault.
+	      if (sequenceFault != null) {
+	      	soapFaultSubcode = sequenceFault.getFaultCode().getFaultCode().getLocalPart();
+	      	
+	      	// Get the identifier - if there is one.
+	      	identifier = sequenceFault.getFaultCode().getDetail();
+	      }
+      } catch (SandeshaException e) {
+      	if (log.isDebugEnabled()) 
+      		log.debug("Unable to process SequenceFault", e);
+      }
+		}
+		
+		if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equals(soapFaultSubcode)) {
+			processCreateSequenceRefusedFault(rmMsgCtx, fault);
+		} else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) ||
+				Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) ) {
+			processSequenceUnknownFault(rmMsgCtx, fault, identifier);
+		}
+	}
+	
+	public static void processMessagesForFaults (RMMsgContext rmMsgCtx) throws AxisFault {
+		
+		SOAPEnvelope envelope = rmMsgCtx.getSOAPEnvelope();
+		if (envelope==null) 
+			return;
+		
+		SOAPFault faultPart = envelope.getBody().getFault();
+
+		if (faultPart != null) {
+
+			// constructing the fault
+			AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
+			manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+		}
+
+	}
+
+	
+	private static AxisFault getAxisFaultFromFromSOAPFault(SOAPFault faultPart) {
+		AxisFault axisFault = new AxisFault(faultPart.getCode(), faultPart.getReason(), faultPart.getNode(), faultPart
+				.getRole(), faultPart.getDetail());
+
+		return axisFault;
+	}
+
+	/** 
+	 * Checks to see if the message number received is == to the Long.MAX_VALUE
+	 * 
+	 * Throws and AxisFault, or sends a Fault message if the condition is met.
+	 * @throws AxisFault 
+	 */
+	public static boolean checkForMessageRolledOver(RMMsgContext rmMessageContext, String sequenceId, long msgNo)
+	
+	throws AxisFault {
+		if (msgNo == Long.MAX_VALUE) {
+			if (log.isDebugEnabled()) 
+				log.debug("Max message number reached " + msgNo);
+			// Return a CreateSequenceRefused error
+			MessageContext messageContext = rmMessageContext.getMessageContext();
+
+			int SOAPVersion = SandeshaUtil.getSOAPVersion(messageContext.getEnvelope());
+
+			FaultData data = new FaultData();
+			data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+			data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(), 
+					Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER ));
+
+			SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+			OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+					rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+			identifierElement.setText(sequenceId);
+			
+			OMElement maxMsgNumber = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.MAX_MSG_NUMBER,
+					rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+			maxMsgNumber.setText(Long.toString(msgNo));
+			
+			data.setDetail(identifierElement);
+			data.setDetail2(maxMsgNumber);
+
+			data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.messageNumberRollover));
+			
+			data.setType(Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER);
+
+			getOrSendFault(rmMessageContext, data);
+			
+			return true;
+		}
+	  return false;
+  }
+	
+	/**
+	 * On receipt of a CreateSequenceRefused fault, terminate the sequence and notify any waiting
+	 * clients of the error.
+	 * @param fault 
+	 * @throws AxisFault 
+	 */
+	private static void processCreateSequenceRefusedFault(RMMsgContext rmMsgCtx, AxisFault fault) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::processCreateSequenceRefusedFault");
+
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+
+		RelatesTo relatesTo = rmMsgCtx.getMessageContext().getRelatesTo();
+		String createSeqMsgId = null;
+		if (relatesTo != null) {
+			createSeqMsgId = relatesTo.getValue();
+		} else {
+			// Work out the related message from the operation context
+			OperationContext context = rmMsgCtx.getMessageContext().getOperationContext();
+			MessageContext createSeq = context.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+			if(createSeq != null) createSeqMsgId = createSeq.getMessageID();
+		}
+		if(createSeqMsgId == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
+			log.error(message);
+			throw new SandeshaException(message);
+		}
+
+		SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+		RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+
+		RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId);
+		if (rmsBean == null) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Unable to find RMSBean");
+			return;
+		}
+		
+	/*	if (rmsBean.getLastSendError() == null) {
+			// Indicate that there was an error when sending the Create Sequence.
+			rmsBean.setLastSendError(fault);
+			
+			// Update the RMSBean
+			rmsBeanMgr.update(rmsBean);
+			if (log.isDebugEnabled())
+				log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Allowing another CreateSequence attempt");
+			return;
+		}
+*/
+		SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+		if (createSequenceSenderBean == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
+
+		// deleting the create sequence entry.
+		retransmitterMgr.delete(createSeqMsgId);
+			
+		// Notify the clients of a failure
+		notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+		
+		rmMsgCtx.pause();
+		
+		// Cleanup sending side.
+		if (log.isDebugEnabled())
+			log.debug("Terminating sending sequence " + rmsBean);
+		TerminateManager.terminateSendingSide(rmsBean, storageManager);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::processCreateSequenceRefusedFault");
+	}
+
+	/**
+	 * If the RMD returns a SequenceTerminated, or an Unknown sequence fault, then we should 
+	 * mark the RMS Sequence as terminated and notify clients of the error.
+	 * 
+	 * @param rmMsgCtx
+	 * @param fault
+	 * @param identifier 
+	 */
+	private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID);
+
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+		
+		// Find the rmsBean
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+		if (rmsBean != null) {
+		
+			// Notify the clients of a failure
+			notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+			
+			rmMsgCtx.pause();
+			
+			// Cleanup sending side.
+			if (log.isDebugEnabled())
+				log.debug("Terminating sending sequence " + rmsBean);
+			TerminateManager.terminateSendingSide(rmsBean, storageManager);
+			
+			// Update the last activated time.
+			rmsBean.setLastActivatedTime(System.currentTimeMillis());
+			
+			// Update the bean in the map
+			storageManager.getRMSBeanMgr().update(rmsBean);
+		}
+		else {
+			RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
+			if (rmdBean != null) {
+				rmMsgCtx.pause();
+				
+				// Cleanup sending side.
+				if (log.isDebugEnabled())
+					log.debug("Terminating sending sequence " + rmdBean);
+				TerminateManager.cleanReceivingSideOnTerminateMessage(configCtx, rmdBean.getSequenceID(), storageManager);
+				
+				// Update the last activated time.
+				rmdBean.setLastActivatedTime(System.currentTimeMillis());
+				
+				// Update the bean in the map
+				storageManager.getRMDBeanMgr().update(rmdBean);
+			
+			}
+			else {
+				if (log.isDebugEnabled())
+					log.debug("Exit: FaultManager::processSequenceUnknownFault Unable to find sequence");
+				return;
+			}
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: FaultManager::processSequenceUnknownFault");	  
+  }
+
+	static void notifyClientsOfFault(String internalSequenceId, 
+			StorageManager storageManager, ConfigurationContext configCtx, AxisFault fault) throws SandeshaStorageException {
+		// Locate and update all of the messages for this sequence, now that we know
+		// the sequence id.
+		SenderBean target = new SenderBean();
+		target.setInternalSequenceID(internalSequenceId);
+		
+		Iterator iterator = storageManager.getSenderBeanMgr().find(target).iterator();
+		while (iterator.hasNext()) {
+			SenderBean tempBean = (SenderBean) iterator.next();
+
+			String messageStoreKey = tempBean.getMessageContextRefKey();
+			
+			// Retrieve the message context.
+			MessageContext context = storageManager.retrieveMessageContext(messageStoreKey, configCtx);
+			
+      AxisOperation axisOperation = context.getAxisOperation();
+      if (axisOperation != null)
+      {
+        MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
+        if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver))
+        {
+          Callback callback = ((CallbackReceiver)msgReceiver).lookupCallback(context.getMessageID());
+          if (callback != null)
+          {
+            callback.onError(fault);
+          }
+        }
+      }
+		}
+
+	}
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,128 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * This is used to adjust retransmission infoamation after each time the message
+ * is sent.
+ */
+
+public class MessageRetransmissionAdjuster {
+
+	private static final Log log = LogFactory.getLog(MessageRetransmissionAdjuster.class);
+
+	public static boolean adjustRetransmittion(RMMsgContext rmMsgCtx, SenderBean retransmitterBean, ConfigurationContext configContext,
+			StorageManager storageManager) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: MessageRetransmissionAdjuster::adjustRetransmittion");
+
+		String internalSequenceID = retransmitterBean.getInternalSequenceID();
+		String sequenceID = retransmitterBean.getSequenceID();
+
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
+		rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, sequenceID);
+		
+		// operation is the lowest level Sandesha2 could be attached.
+		SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation());
+
+		retransmitterBean.setSentCount(retransmitterBean.getSentCount() + 1);
+		adjustNextRetransmissionTime(retransmitterBean, propertyBean);
+
+		int maxRetransmissionAttempts = propertyBean.getMaximumRetransmissionCount();
+		
+		// We can only time out sequences if we can identify the correct sequence, and
+		// we need the internal sequence id for that.
+		boolean continueSending = true;
+		if(internalSequenceID != null) {
+			boolean timeOutSequence = false;
+			if (maxRetransmissionAttempts >= 0 && retransmitterBean.getSentCount() > maxRetransmissionAttempts)
+				timeOutSequence = true;
+
+			if (timeOutSequence) {
+	
+				retransmitterBean.setSend(false);
+
+				// Warn the user that the sequence has timed out
+				//if (log.isWarnEnabled())
+				//	log.warn();
+
+				// Only messages of outgoing sequences get retransmitted. So named
+				// following method according to that.
+				
+				SequenceManager.finalizeTimedOutSequence(internalSequenceID, rmMsgCtx.getMessageContext(), storageManager);
+				continueSending = false;
+			}
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: MessageRetransmissionAdjuster::adjustRetransmittion, " + continueSending);
+		return continueSending;
+	}
+
+	/**
+	 * This sets the next time the message has to be retransmitted. This uses
+	 * the base retransmission interval and exponentialBackoff properties to
+	 * calculate the correct time.
+	 * 
+	 * @param retransmitterBean
+	 * @param policyBean
+	 * @return
+	 */
+	private static SenderBean adjustNextRetransmissionTime(SenderBean retransmitterBean, SandeshaPolicyBean propertyBean) throws SandeshaException {
+
+		int count = retransmitterBean.getSentCount();
+
+		long baseInterval = propertyBean.getRetransmissionInterval();
+
+		long newInterval = baseInterval;
+		if (propertyBean.isExponentialBackoff()) {
+			newInterval = generateNextExponentialBackedoffDifference(count, baseInterval);
+		}
+
+		long newTimeToSend = 0;
+
+		long timeNow = System.currentTimeMillis();
+		newTimeToSend = timeNow + newInterval;
+
+		retransmitterBean.setTimeToSend(newTimeToSend);
+
+		return retransmitterBean;
+	}
+
+	private static long generateNextExponentialBackedoffDifference(int count, long initialInterval) {
+		long interval = initialInterval;
+		for (int i = 1; i < count; i++) {
+			interval = interval * 2;
+		}
+
+		return interval;
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,253 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.MessageContext;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.CloseSequenceResponse;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.MakeConnection;
+import org.apache.sandesha2.wsrm.RMElements;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
+
+/**
+ * This class is used to create an RMMessageContext out of an MessageContext.
+ */
+
+public class MsgInitializer {
+
+	/**
+	 * Called to create a rmMessageContext out of an message context. Finds out
+	 * things like rm version and message type as well.
+	 * 
+	 * @param ctx
+	 * @param assumedRMNamespace
+	 *            this is used for validation (to find out weather the
+	 *            rmNamespace of the current message is equal to the regietered
+	 *            rmNamespace of the sequence). If null validation will not
+	 *            happen.
+	 * 
+	 * @return
+	 * @throws SandeshaException
+	 */
+	public static RMMsgContext initializeMessage(MessageContext ctx) throws AxisFault {
+		RMMsgContext rmMsgCtx = new RMMsgContext(ctx);
+
+		populateRMMsgContext(ctx, rmMsgCtx);
+		validateMessage(rmMsgCtx);
+		return rmMsgCtx;
+	}
+
+	/**
+	 * Adds the message parts the the RMMessageContext.
+	 * 
+	 * @param msgCtx
+	 * @param rmMsgContext
+	 */
+	private static void populateRMMsgContext(MessageContext msgCtx, RMMsgContext rmMsgContext) throws AxisFault {
+
+		// if client side and the addressing version is not set. assuming the
+		// default addressing version
+		String addressingNamespace = (String) msgCtx.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+		if (addressingNamespace == null && !msgCtx.isServerSide())
+			addressingNamespace = AddressingConstants.Final.WSA_NAMESPACE;
+
+		RMElements elements = new RMElements(addressingNamespace);
+		elements.fromSOAPEnvelope(msgCtx.getEnvelope(), msgCtx.getWSAAction());
+
+		String rmNamespace = null;
+
+		if (elements.getCreateSequence() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ, elements.getCreateSequence());
+			rmNamespace = elements.getCreateSequence().getNamespaceValue();
+		}
+
+		if (elements.getCreateSequenceResponse() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE, elements
+					.getCreateSequenceResponse());
+			rmNamespace = elements.getCreateSequenceResponse().getNamespaceValue();
+		}
+
+		if (elements.getSequence() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, elements.getSequence());
+			rmNamespace = elements.getSequence().getNamespaceValue();
+		}
+
+		//In case of ack messages RM Namespace is decided based on the sequenceId of the last 
+		//sequence Ack. In other words Sandesha2 does not expect to receive two SequenceAcknowledgements
+		//of different RM specifications in the same incoming message
+		for (Iterator iter = elements.getSequenceAcknowledgements();iter.hasNext();) {
+			SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) iter.next();
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, sequenceAck);
+			rmNamespace = sequenceAck.getNamespaceValue();
+		}
+
+		if (elements.getTerminateSequence() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ, elements.getTerminateSequence());
+			rmNamespace = elements.getTerminateSequence().getNamespaceValue();
+		}
+
+		if (elements.getTerminateSequenceResponse() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE, elements
+					.getTerminateSequenceResponse());
+			rmNamespace = elements.getTerminateSequenceResponse().getNamespaceValue();
+		}
+
+		//In case of ack request messages RM Namespace is decided based on the sequenceId of the last 
+		//ack request.
+		for (Iterator iter = elements.getAckRequests();iter.hasNext();) {
+			AckRequested ackRequest = (AckRequested) iter.next();
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST, ackRequest);
+			rmNamespace = ackRequest.getNamespaceValue();
+		}
+
+		if (elements.getCloseSequence() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE, elements.getCloseSequence());
+			rmNamespace = elements.getCloseSequence().getNamespaceValue();
+		}
+
+		if (elements.getCloseSequenceResponse() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE_RESPONSE, elements
+					.getCloseSequenceResponse());
+			rmNamespace = elements.getCloseSequenceResponse().getNamespaceValue();
+		}
+		
+		if (elements.getUsesSequenceSTR() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.USES_SEQUENCE_STR, elements
+					.getUsesSequenceSTR());
+		}
+		
+		if (elements.getMakeConnection() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION,
+					elements.getMakeConnection());
+		}
+		
+		if (elements.getMessagePending() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING,
+					elements.getMessagePending());
+		}
+		
+		if (elements.getSequenceFault() != null) {
+			rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE_FAULT,
+					elements.getSequenceFault());
+		}
+
+		rmMsgContext.setRMNamespaceValue(rmNamespace);
+
+	}
+
+	/**
+	 * This is used to validate the message. Also set an Message type. Possible
+	 * types are given in the Sandesha2Constants.MessageTypes interface.
+	 * 
+	 * @param rmMsgCtx
+	 * @return
+	 * @throws SandeshaException
+	 */
+	private static boolean validateMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+		String sequenceID = null;
+
+		CreateSequence createSequence = (CreateSequence) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.CREATE_SEQ);
+		CreateSequenceResponse createSequenceResponse = (CreateSequenceResponse) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+		TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+		TerminateSequenceResponse terminateSequenceResponse = (TerminateSequenceResponse) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
+		Iterator sequenceAcknowledgementsIter = rmMsgCtx.getMessageParts(
+				Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+		Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.SEQUENCE);
+		Iterator ackRequestedIter = rmMsgCtx.getMessageParts(
+				Sandesha2Constants.MessageParts.ACK_REQUEST);
+		CloseSequence closeSequence = (CloseSequence) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+		CloseSequenceResponse closeSequenceResponse = (CloseSequenceResponse) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.CLOSE_SEQUENCE_RESPONSE);
+		MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(
+				Sandesha2Constants.MessageParts.MAKE_CONNECTION);
+
+		// Setting message type.
+		if (createSequence != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+		} else if (createSequenceResponse != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE);
+			sequenceID = createSequenceResponse.getIdentifier().getIdentifier();
+		} else if (terminateSequence != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+			sequenceID = terminateSequence.getIdentifier().getIdentifier();
+		} else if (terminateSequenceResponse != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE);
+			sequenceID = terminateSequenceResponse.getIdentifier().getIdentifier();
+		} else if (rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE) != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+			sequenceID = sequence.getIdentifier().getIdentifier();
+		} else if (sequenceAcknowledgementsIter.hasNext()) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAcknowledgementsIter.next();
+			
+			//if there is only on sequenceAck, sequenceId will be set. Otherwise it will not be.
+			if (!sequenceAcknowledgementsIter.hasNext())
+				sequenceID = sequenceAcknowledgement.getIdentifier().getIdentifier();
+		} else if (ackRequestedIter.hasNext()) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
+			AckRequested ackRequest = (AckRequested) ackRequestedIter.next();
+
+			//if there is only on sequenceAck, sequenceId will be set. Otherwise it will not be.
+			if (!ackRequestedIter.hasNext())
+				sequenceID = ackRequest.getIdentifier().getIdentifier();
+		} else if (closeSequence != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE);
+			sequenceID = closeSequence.getIdentifier().getIdentifier();
+		} else if (closeSequenceResponse != null) {
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE_RESPONSE);
+			sequenceID = closeSequenceResponse.getIdentifier().getIdentifier(); 
+		} else if (makeConnection != null){
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
+			if (makeConnection.getIdentifier()!=null) {
+				sequenceID = makeConnection.getIdentifier().getIdentifier();
+			} else if (makeConnection.getAddress()!=null){
+				//TODO get sequenceId based on the anonymous address.
+			} else {
+				throw new SandeshaException (
+						"Invalid MakeConnection message. Either Address or Identifier must be present");
+			}
+		} else
+			rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.UNKNOWN);
+		
+		if (sequenceID!=null)
+			rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceID);
+
+		return true;
+	}
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org