You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by di...@apache.org on 2007/08/09 02:58:01 UTC

svn commit: r564063 [7/16] - in /webservices/sandesha/trunk/java: ./ modules/client/ modules/core/ modules/core/src/main/java/org/apache/sandesha2/ modules/core/src/main/java/org/apache/sandesha2/client/ modules/core/src/main/java/org/apache/sandesha2/...

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Wed Aug  8 17:57:51 2007
@@ -1,515 +1,515 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *  
- */
-
-package org.apache.sandesha2.workers;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.transport.RequestResponseTransport;
-import org.apache.axis2.transport.TransportUtils;
-import org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus;
-import org.apache.axis2.wsdl.WSDLConstants;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.policy.SandeshaPolicyBean;
-import org.apache.sandesha2.storage.SandeshaStorageException;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMDBean;
-import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
-
-/**
- * This is responsible for sending and re-sending messages of Sandesha2. This
- * represent a thread that keep running all the time. This keep looking at the
- * Sender table to find out any entries that should be sent.
- */
-
-public class Sender extends SandeshaThread {
-
-	private static final Log log = LogFactory.getLog(Sender.class);
-
-	// If this sender is working for several sequences, we use round-robin to
-	// try and give them all a chance to invoke messages.
-	int nextIndex = 0;
-	boolean processedMessage = false;
-	
-	public Sender () {
-		super(Sandesha2Constants.SENDER_SLEEP_TIME);
-	}
-
-	protected boolean internalRun() {
-		if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
-
-		Transaction transaction = null;
-		boolean sleep = false;
-
-		try {
-			// Pick a sequence using a round-robin approach
-			ArrayList allSequencesList = getSequences();
-			int size = allSequencesList.size();
-
-			if (log.isDebugEnabled())
-				log.debug("Choosing one from " + size + " sequences");
-			if(nextIndex >= size) {
-				nextIndex = 0;
-
-				// We just looped over the set of sequences. If we didn't process any
-				// messages on this loop then we sleep before the next one
-				if(size == 0 || !processedMessage) {
-					sleep = true;
-				}
-				processedMessage = false;
-				
-				// At this point - delete any sequences that have timed out, or been terminated.
-				deleteTerminatedSequences(storageManager);
-
-				// Also clean up and sender beans that are not yet eligible for sending, but
-				// are blocking the transport threads.
-				unblockTransportThreads(storageManager);
-				
-				// Finally, check for messages that can only be serviced by polling, and warn
-				// the user if they are too old
-				checkForOrphanMessages(storageManager);
-
-				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
-				return sleep;
-			}
-			
-			transaction = storageManager.getTransaction();
-
-			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
-			String sequenceId = entry.getSequenceId();
-			if (log.isDebugEnabled())
-				log.debug("Chose sequence " + sequenceId);
-
-			String rmVersion = null;
-			// Check that the sequence is still valid
-			boolean found = false;
-			if(entry.isRmSource()) {
-				RMSBean matcher = new RMSBean();
-				matcher.setInternalSequenceID(sequenceId);
-				matcher.setTerminated(false);
-				RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
-				if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
-					sequenceId = rms.getSequenceID();					
-					if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))					
-						SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
-					else
-						found = true;
-					rmVersion = rms.getRMVersion();
-				}
-				
-			} else {
-				RMDBean matcher = new RMDBean();
-				matcher.setSequenceID(sequenceId);
-				matcher.setTerminated(false);
-				RMDBean rmd = storageManager.getRMDBeanMgr().findUnique(matcher);
-				if(rmd != null) {
-					found = true;
-					rmVersion = rmd.getRMVersion();
-				}
-			}
-			if (!found) {
-				stopThreadForSequence(sequenceId, entry.isRmSource());
-				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, sequence has ended");
-				
-				if(transaction != null && transaction.isActive()) {
-					transaction.commit();
-					transaction = null;
-				}
-				
-				return false;
-			}
-			
-			SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
-			SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);
-			
-			if (senderBean == null) {
-				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, no message for this sequence");
-				
-				if(transaction != null && transaction.isActive()) {
-					transaction.commit();
-					transaction = null;
-				}
-				
-				return false; // Move on to the next sequence in the list
-			}
-
-			// work Id is used to define the piece of work that will be
-			// assigned to the Worker thread,
-			// to handle this Sender bean.
-			
-			//workId contains a timeTiSend part to cater for retransmissions.
-			//This will cause retransmissions to be treated as new work.
-			String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
-
-			// check weather the bean is already assigned to a worker.
-			if (getWorkerLock().isWorkPresent(workId)) {
-				// As there is already a worker running we are probably looping
-				// too fast, so sleep on the next loop.
-				if (log.isDebugEnabled()) {
-					String message = SandeshaMessageHelper.getMessage(
-									SandeshaMessageKeys.workAlreadyAssigned,
-									workId);
-					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
-				}
-				
-				if(transaction != null && transaction.isActive()) {
-					transaction.commit();
-					transaction = null;
-				}
-				
-				return true;
-			}
-
-			//commiting the transaction here to release resources early.
-			if(transaction != null && transaction.isActive()) transaction.commit();
-			transaction = null;
-
-			// start a worker which will work on this messages.
-			SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
-			worker.setLock(getWorkerLock());
-			worker.setWorkId(workId);
-			threadPool.execute(worker);
-
-			// adding the workId to the lock after assigning it to a thread
-			// makes sure
-			// that all the workIds in the Lock are handled by threads.
-			getWorkerLock().addWork(workId);
-
-			// If we got to here then we found work to do on the sequence, so we should
-			// remember not to sleep at the end of the list of sequences.
-			processedMessage = true;
-			
-		} catch (Exception e) {
-
-			// TODO : when this is the client side throw the exception to
-			// the client when necessary.
-
-			
-			//TODO rollback only if a SandeshaStorageException.
-			//This allows the other Exceptions to be used within the Normal flow.
-			
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
-			log.debug(message, e);
-		} finally {
-			if (transaction != null && transaction.isActive()) {
-				try {
-					transaction.rollback();
-					transaction = null;
-				} catch (Exception e) {
-					String message = SandeshaMessageHelper
-							.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
-					log.debug(message, e);
-				}
-			}
-		}
-		if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
-		return false;
-	}
-
-	/**
-	 * Finds any RMDBeans that have not been used inside the set InnactivityTimeoutInterval
-	 * 
-	 * Iterates through RMSBeans and RMDBeans that have been terminated or timed out and 
-	 * deletes them.
-	 *
-	 */
-	private void deleteTerminatedSequences(StorageManager storageManager) {
-		if (log.isDebugEnabled()) 
-			log.debug("Enter: Sender::deleteTerminatedSequences");
-
-		RMSBean finderBean = new RMSBean();
-		finderBean.setTerminated(true);
-		
-		Transaction transaction = null;
-		
-		try {
-			transaction = storageManager.getTransaction();
-			
-			SandeshaPolicyBean propertyBean = 
-				SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());			
-
-    	long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
-    	if (deleteTime < 0)
-    		deleteTime = 0;
-
-    	if (deleteTime > 0) {
-				// Find terminated sequences.
-		    List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-		    
-		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-		    
-		    finderBean.setTerminated(false);
-		    finderBean.setTimedOut(true);
-		    
-		    // Find timed out sequences
-		    rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-		    	    
-		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-		    
-		    // Remove any terminated RMDBeans.
-		    RMDBean finderRMDBean = new RMDBean();
-		    finderRMDBean.setTerminated(true);
-		    
-		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
-	
-		    Iterator beans = rmdBeans.iterator();
-		    while (beans.hasNext()) {
-		    	RMDBean rmdBean = (RMDBean)beans.next();
-		    	
-		    	long timeNow = System.currentTimeMillis();
-		    	long lastActivated = rmdBean.getLastActivatedTime();
-	
-		    	// delete sequences that have been timedout or deleted for more than 
-		    	// the SequenceRemovalTimeoutInterval
-		    	if ((lastActivated + deleteTime) < timeNow) {
-		    		if (log.isDebugEnabled())
-		    			log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
-		    		storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
-		    	}	    		    	
-		    }
-    	}
-
-	    // Terminate RMD Sequences that have been inactive.			
-			if (propertyBean.getInactivityTimeoutInterval() > 0) {
-		    RMDBean finderRMDBean = new RMDBean();
-		    finderRMDBean.setTerminated(false);
-				
-		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
-			
-		    Iterator beans = rmdBeans.iterator();
-		    while (beans.hasNext()) {
-		    	RMDBean rmdBean = (RMDBean)beans.next();
-		    	
-		    	long timeNow = System.currentTimeMillis();
-		    	long lastActivated = rmdBean.getLastActivatedTime();
-		    	
-		    	if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
-		    		// Terminate
-		    		rmdBean.setTerminated(true);
-		    		rmdBean.setLastActivatedTime(timeNow);
-		    		if (log.isDebugEnabled())
-		    			log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
-		    		storageManager.getRMDBeanMgr().update(rmdBean);
-		    	}	    		    	
-		    }
-			} 	    
-	    
-			if(transaction != null && transaction.isActive()) transaction.commit();
-			
-		} catch (SandeshaException e) {
-			if (log.isErrorEnabled())
-				log.error(e);
-		} finally {
-			if(transaction != null && transaction.isActive()) {
-				try {
-					transaction.rollback();
-				} catch (SandeshaStorageException e) {
-					if (log.isDebugEnabled())
-						log.debug("Caught exception rolling back transaction", e);
-				}
-			}
-		}
-		
-		if (log.isDebugEnabled()) 
-			log.debug("Exit: Sender::deleteTerminatedSequences");
-	}
-	
-	private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime) 
-
-	throws SandeshaStorageException {		
-		if (log.isDebugEnabled()) 
-			log.debug("Enter: Sender::deleteRMSBeans");
-
-    Iterator beans = rmsBeans.iterator();
-    
-    while (beans.hasNext())
-    {
-    	RMSBean rmsBean = (RMSBean)beans.next();
-    	long timeNow = System.currentTimeMillis();
-    	long lastActivated = rmsBean.getLastActivatedTime();
-    	// delete sequences that have been timedout or deleted for more than 
-    	// the SequenceRemovalTimeoutInterval
-   	
-    	if ((lastActivated + deleteTime) < timeNow) {
-    		if (log.isDebugEnabled())
-    			log.debug("Removing RMSBean " + rmsBean);
-    		storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
-    		storageManager.removeMessageContext( rmsBean.getReferenceMessageStoreKey() );
-    	}	    	
-    }
-
-		if (log.isDebugEnabled()) 
-			log.debug("Exit: Sender::deleteRMSBeans");
-	}
-
-	private void unblockTransportThreads(StorageManager manager)
-	throws SandeshaStorageException
-	{
-		if (log.isDebugEnabled()) log.debug("Enter: Sender::unblockTransportThreads");
-
-		Transaction transaction = null;
-		try {
-			transaction = manager.getTransaction();
-			
-			// This finder will look for beans that have been locking the transport for longer than
-			// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does the time comparison
-			// for us.
-			SenderBean finder = new SenderBean();
-			finder.setSend(false);
-			finder.setTransportAvailable(true);
-			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
-			
-			List beans = manager.getSenderBeanMgr().find(finder);
-			Iterator beanIter = beans.iterator();
-			while(beanIter.hasNext()) {
-				// The beans we have found are assigned to an internal sequence id, but the create
-				// sequence has not completed yet (and perhaps never will). Server-side, most of the
-				// info that we can usefully print is associated with the inbound sequence that generated
-				// this message.
-				SenderBean bean = (SenderBean) beanIter.next();
-				
-				// Load the message, so that we can free the transport (if there is one there). The
-				// case we are trying to free up is when there is a request-response transport, and
-				// it's still there waiting.
-				MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
-
-				RequestResponseTransport t = null;
-				MessageContext inMsg = null;
-				OperationContext op = msgCtx.getOperationContext();
-				if (op != null)
-					inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-				if (inMsg != null)
-					t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-	
-				if((t != null && !RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
-					if(log.isWarnEnabled()) {
-						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
-						log.warn(message);
-					}
-					// If the message is a reply, then the request may need to be acked. Rather
-					// than just return a HTTP 202, we should try to send an ack.
-					boolean sendAck = false;
-					RMDBean inbound = null;
-					String inboundSeq = bean.getInboundSequenceId();
-					if(inboundSeq != null) 
-						inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
-					
-					if(inbound != null) {
-						String acksTo = inbound.getAcksToEPR();
-						EndpointReference acksToEPR = new EndpointReference(acksTo);
-						if(acksTo == null || acksToEPR.hasAnonymousAddress())
-							sendAck = true;
-					}
-					
-					if(sendAck) {
-						RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
-						RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
-								rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
-						AcknowledgementManager.sendAckNow(ackRMMsgCtx);
-						TransportUtils.setResponseWritten(msgCtx, true);
-					} else {
-						TransportUtils.setResponseWritten(msgCtx, false);
-					}
-	
-					// Mark the bean so that we know the transport is missing, and reset the send time
-					bean.setTransportAvailable(false);
-					bean.setTimeToSend(System.currentTimeMillis());
-					
-					// Update the bean
-					manager.getSenderBeanMgr().update(bean);
-				}
-			}
-	
-			if(transaction != null && transaction.isActive()) transaction.commit();
-			transaction = null;
-			
-		} catch(Exception e) {
-			// There isn't much we can do here, so log the exception and continue.
-			if(log.isDebugEnabled()) log.debug("Exception", e);
-		} finally {
-			if(transaction != null && transaction.isActive()) transaction.rollback();
-		}
-		
-		if (log.isDebugEnabled()) log.debug("Exit: Sender::unblockTransportThreads");
-	}
-		
-	private void checkForOrphanMessages(StorageManager manager)
-	throws SandeshaStorageException
-	{
-		if(log.isDebugEnabled()) log.debug("Enter: Sender::checkForOrphanMessages");
-		
-		Transaction tran = null;
-		try {
-			tran = manager.getTransaction();
-	
-			// This finder will look for beans that should have been sent, but could not be sent
-			// because they need a MakeConnection message to come in to pick it up. We also factor
-			// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to arrive.
-			SenderBean finder = new SenderBean();
-			finder.setSend(true);
-			finder.setTransportAvailable(false);
-			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
-			
-			List beans = manager.getSenderBeanMgr().find(finder);
-			Iterator beanIter = beans.iterator();
-			while(beanIter.hasNext()) {
-				SenderBean bean = (SenderBean) beanIter.next();
-				
-				// Emit a message to warn the user that MakeConnections are not arriving to pick
-				// messages up
-				if(log.isWarnEnabled()) {
-					String messageType = Integer.toString(bean.getMessageType());
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, messageType);
-					log.warn(message);
-				}
-				
-				// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
-				bean.setTimeToSend(System.currentTimeMillis());
-				manager.getSenderBeanMgr().update(bean);
-			}
-	
-			if(tran != null && tran.isActive()) tran.commit();
-			tran = null;
-	
-		} catch(Exception e) {
-			// There isn't much we can do here, so log the exception and continue.
-			if(log.isDebugEnabled()) log.debug("Exception", e);
-		} finally {
-			if(tran != null && tran.isActive()) tran.rollback();
-		}
-		
-		if(log.isDebugEnabled()) log.debug("Exit: Sender::checkForOrphanMessages");
-	}
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+
+/**
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
+ */
+
+public class Sender extends SandeshaThread {
+
+	private static final Log log = LogFactory.getLog(Sender.class);
+
+	// If this sender is working for several sequences, we use round-robin to
+	// try and give them all a chance to invoke messages.
+	int nextIndex = 0;
+	boolean processedMessage = false;
+	
+	public Sender () {
+		super(Sandesha2Constants.SENDER_SLEEP_TIME);
+	}
+
+	protected boolean internalRun() {
+		if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
+
+		Transaction transaction = null;
+		boolean sleep = false;
+
+		try {
+			// Pick a sequence using a round-robin approach
+			ArrayList allSequencesList = getSequences();
+			int size = allSequencesList.size();
+
+			if (log.isDebugEnabled())
+				log.debug("Choosing one from " + size + " sequences");
+			if(nextIndex >= size) {
+				nextIndex = 0;
+
+				// We just looped over the set of sequences. If we didn't process any
+				// messages on this loop then we sleep before the next one
+				if(size == 0 || !processedMessage) {
+					sleep = true;
+				}
+				processedMessage = false;
+				
+				// At this point - delete any sequences that have timed out, or been terminated.
+				deleteTerminatedSequences(storageManager);
+
+				// Also clean up and sender beans that are not yet eligible for sending, but
+				// are blocking the transport threads.
+				unblockTransportThreads(storageManager);
+				
+				// Finally, check for messages that can only be serviced by polling, and warn
+				// the user if they are too old
+				checkForOrphanMessages(storageManager);
+
+				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
+				return sleep;
+			}
+			
+			transaction = storageManager.getTransaction();
+
+			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+			String sequenceId = entry.getSequenceId();
+			if (log.isDebugEnabled())
+				log.debug("Chose sequence " + sequenceId);
+
+			String rmVersion = null;
+			// Check that the sequence is still valid
+			boolean found = false;
+			if(entry.isRmSource()) {
+				RMSBean matcher = new RMSBean();
+				matcher.setInternalSequenceID(sequenceId);
+				matcher.setTerminated(false);
+				RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
+				if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
+					sequenceId = rms.getSequenceID();					
+					if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))					
+						SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
+					else
+						found = true;
+					rmVersion = rms.getRMVersion();
+				}
+				
+			} else {
+				RMDBean matcher = new RMDBean();
+				matcher.setSequenceID(sequenceId);
+				matcher.setTerminated(false);
+				RMDBean rmd = storageManager.getRMDBeanMgr().findUnique(matcher);
+				if(rmd != null) {
+					found = true;
+					rmVersion = rmd.getRMVersion();
+				}
+			}
+			if (!found) {
+				stopThreadForSequence(sequenceId, entry.isRmSource());
+				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, sequence has ended");
+				
+				if(transaction != null && transaction.isActive()) {
+					transaction.commit();
+					transaction = null;
+				}
+				
+				return false;
+			}
+			
+			SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
+			SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);
+			
+			if (senderBean == null) {
+				if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, no message for this sequence");
+				
+				if(transaction != null && transaction.isActive()) {
+					transaction.commit();
+					transaction = null;
+				}
+				
+				return false; // Move on to the next sequence in the list
+			}
+
+			// work Id is used to define the piece of work that will be
+			// assigned to the Worker thread,
+			// to handle this Sender bean.
+			
+			//workId contains a timeTiSend part to cater for retransmissions.
+			//This will cause retransmissions to be treated as new work.
+			String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
+
+			// check weather the bean is already assigned to a worker.
+			if (getWorkerLock().isWorkPresent(workId)) {
+				// As there is already a worker running we are probably looping
+				// too fast, so sleep on the next loop.
+				if (log.isDebugEnabled()) {
+					String message = SandeshaMessageHelper.getMessage(
+									SandeshaMessageKeys.workAlreadyAssigned,
+									workId);
+					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
+				}
+				
+				if(transaction != null && transaction.isActive()) {
+					transaction.commit();
+					transaction = null;
+				}
+				
+				return true;
+			}
+
+			//commiting the transaction here to release resources early.
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+
+			// start a worker which will work on this messages.
+			SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
+			worker.setLock(getWorkerLock());
+			worker.setWorkId(workId);
+			threadPool.execute(worker);
+
+			// adding the workId to the lock after assigning it to a thread
+			// makes sure
+			// that all the workIds in the Lock are handled by threads.
+			getWorkerLock().addWork(workId);
+
+			// If we got to here then we found work to do on the sequence, so we should
+			// remember not to sleep at the end of the list of sequences.
+			processedMessage = true;
+			
+		} catch (Exception e) {
+
+			// TODO : when this is the client side throw the exception to
+			// the client when necessary.
+
+			
+			//TODO rollback only if a SandeshaStorageException.
+			//This allows the other Exceptions to be used within the Normal flow.
+			
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
+			log.debug(message, e);
+		} finally {
+			if (transaction != null && transaction.isActive()) {
+				try {
+					transaction.rollback();
+					transaction = null;
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper
+							.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
+					log.debug(message, e);
+				}
+			}
+		}
+		if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+		return false;
+	}
+
+	/**
+	 * Finds any RMDBeans that have not been used inside the set InnactivityTimeoutInterval
+	 * 
+	 * Iterates through RMSBeans and RMDBeans that have been terminated or timed out and 
+	 * deletes them.
+	 *
+	 */
+	private void deleteTerminatedSequences(StorageManager storageManager) {
+		if (log.isDebugEnabled()) 
+			log.debug("Enter: Sender::deleteTerminatedSequences");
+
+		RMSBean finderBean = new RMSBean();
+		finderBean.setTerminated(true);
+		
+		Transaction transaction = null;
+		
+		try {
+			transaction = storageManager.getTransaction();
+			
+			SandeshaPolicyBean propertyBean = 
+				SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());			
+
+    	long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
+    	if (deleteTime < 0)
+    		deleteTime = 0;
+
+    	if (deleteTime > 0) {
+				// Find terminated sequences.
+		    List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+		    
+		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+		    
+		    finderBean.setTerminated(false);
+		    finderBean.setTimedOut(true);
+		    
+		    // Find timed out sequences
+		    rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+		    	    
+		    deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
+		    
+		    // Remove any terminated RMDBeans.
+		    RMDBean finderRMDBean = new RMDBean();
+		    finderRMDBean.setTerminated(true);
+		    
+		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+	
+		    Iterator beans = rmdBeans.iterator();
+		    while (beans.hasNext()) {
+		    	RMDBean rmdBean = (RMDBean)beans.next();
+		    	
+		    	long timeNow = System.currentTimeMillis();
+		    	long lastActivated = rmdBean.getLastActivatedTime();
+	
+		    	// delete sequences that have been timedout or deleted for more than 
+		    	// the SequenceRemovalTimeoutInterval
+		    	if ((lastActivated + deleteTime) < timeNow) {
+		    		if (log.isDebugEnabled())
+		    			log.debug("Deleting RMDBean " + deleteTime + " : " + rmdBean);
+		    		storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+		    	}	    		    	
+		    }
+    	}
+
+	    // Terminate RMD Sequences that have been inactive.			
+			if (propertyBean.getInactivityTimeoutInterval() > 0) {
+		    RMDBean finderRMDBean = new RMDBean();
+		    finderRMDBean.setTerminated(false);
+				
+		    List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+			
+		    Iterator beans = rmdBeans.iterator();
+		    while (beans.hasNext()) {
+		    	RMDBean rmdBean = (RMDBean)beans.next();
+		    	
+		    	long timeNow = System.currentTimeMillis();
+		    	long lastActivated = rmdBean.getLastActivatedTime();
+		    	
+		    	if ((lastActivated + propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+		    		// Terminate
+		    		rmdBean.setTerminated(true);
+		    		rmdBean.setLastActivatedTime(timeNow);
+		    		if (log.isDebugEnabled())
+		    			log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + rmdBean);
+		    		storageManager.getRMDBeanMgr().update(rmdBean);
+		    	}	    		    	
+		    }
+			} 	    
+	    
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			
+		} catch (SandeshaException e) {
+			if (log.isErrorEnabled())
+				log.error(e);
+		} finally {
+			if(transaction != null && transaction.isActive()) {
+				try {
+					transaction.rollback();
+				} catch (SandeshaStorageException e) {
+					if (log.isDebugEnabled())
+						log.debug("Caught exception rolling back transaction", e);
+				}
+			}
+		}
+		
+		if (log.isDebugEnabled()) 
+			log.debug("Exit: Sender::deleteTerminatedSequences");
+	}
+	
+	private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime) 
+
+	throws SandeshaStorageException {		
+		if (log.isDebugEnabled()) 
+			log.debug("Enter: Sender::deleteRMSBeans");
+
+    Iterator beans = rmsBeans.iterator();
+    
+    while (beans.hasNext())
+    {
+    	RMSBean rmsBean = (RMSBean)beans.next();
+    	long timeNow = System.currentTimeMillis();
+    	long lastActivated = rmsBean.getLastActivatedTime();
+    	// delete sequences that have been timedout or deleted for more than 
+    	// the SequenceRemovalTimeoutInterval
+   	
+    	if ((lastActivated + deleteTime) < timeNow) {
+    		if (log.isDebugEnabled())
+    			log.debug("Removing RMSBean " + rmsBean);
+    		storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+    		storageManager.removeMessageContext( rmsBean.getReferenceMessageStoreKey() );
+    	}	    	
+    }
+
+		if (log.isDebugEnabled()) 
+			log.debug("Exit: Sender::deleteRMSBeans");
+	}
+
+	private void unblockTransportThreads(StorageManager manager)
+	throws SandeshaStorageException
+	{
+		if (log.isDebugEnabled()) log.debug("Enter: Sender::unblockTransportThreads");
+
+		Transaction transaction = null;
+		try {
+			transaction = manager.getTransaction();
+			
+			// This finder will look for beans that have been locking the transport for longer than
+			// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does the time comparison
+			// for us.
+			SenderBean finder = new SenderBean();
+			finder.setSend(false);
+			finder.setTransportAvailable(true);
+			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
+			
+			List beans = manager.getSenderBeanMgr().find(finder);
+			Iterator beanIter = beans.iterator();
+			while(beanIter.hasNext()) {
+				// The beans we have found are assigned to an internal sequence id, but the create
+				// sequence has not completed yet (and perhaps never will). Server-side, most of the
+				// info that we can usefully print is associated with the inbound sequence that generated
+				// this message.
+				SenderBean bean = (SenderBean) beanIter.next();
+				
+				// Load the message, so that we can free the transport (if there is one there). The
+				// case we are trying to free up is when there is a request-response transport, and
+				// it's still there waiting.
+				MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
+
+				RequestResponseTransport t = null;
+				MessageContext inMsg = null;
+				OperationContext op = msgCtx.getOperationContext();
+				if (op != null)
+					inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+				if (inMsg != null)
+					t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+	
+				if((t != null && !RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
+					if(log.isWarnEnabled()) {
+						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
+						log.warn(message);
+					}
+					// If the message is a reply, then the request may need to be acked. Rather
+					// than just return a HTTP 202, we should try to send an ack.
+					boolean sendAck = false;
+					RMDBean inbound = null;
+					String inboundSeq = bean.getInboundSequenceId();
+					if(inboundSeq != null) 
+						inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
+					
+					if(inbound != null) {
+						String acksTo = inbound.getAcksToEPR();
+						EndpointReference acksToEPR = new EndpointReference(acksTo);
+						if(acksTo == null || acksToEPR.hasAnonymousAddress())
+							sendAck = true;
+					}
+					
+					if(sendAck) {
+						RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+						RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(
+								rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
+						AcknowledgementManager.sendAckNow(ackRMMsgCtx);
+						TransportUtils.setResponseWritten(msgCtx, true);
+					} else {
+						TransportUtils.setResponseWritten(msgCtx, false);
+					}
+	
+					// Mark the bean so that we know the transport is missing, and reset the send time
+					bean.setTransportAvailable(false);
+					bean.setTimeToSend(System.currentTimeMillis());
+					
+					// Update the bean
+					manager.getSenderBeanMgr().update(bean);
+				}
+			}
+	
+			if(transaction != null && transaction.isActive()) transaction.commit();
+			transaction = null;
+			
+		} catch(Exception e) {
+			// There isn't much we can do here, so log the exception and continue.
+			if(log.isDebugEnabled()) log.debug("Exception", e);
+		} finally {
+			if(transaction != null && transaction.isActive()) transaction.rollback();
+		}
+		
+		if (log.isDebugEnabled()) log.debug("Exit: Sender::unblockTransportThreads");
+	}
+		
+	private void checkForOrphanMessages(StorageManager manager)
+	throws SandeshaStorageException
+	{
+		if(log.isDebugEnabled()) log.debug("Enter: Sender::checkForOrphanMessages");
+		
+		Transaction tran = null;
+		try {
+			tran = manager.getTransaction();
+	
+			// This finder will look for beans that should have been sent, but could not be sent
+			// because they need a MakeConnection message to come in to pick it up. We also factor
+			// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to arrive.
+			SenderBean finder = new SenderBean();
+			finder.setSend(true);
+			finder.setTransportAvailable(false);
+			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
+			
+			List beans = manager.getSenderBeanMgr().find(finder);
+			Iterator beanIter = beans.iterator();
+			while(beanIter.hasNext()) {
+				SenderBean bean = (SenderBean) beanIter.next();
+				
+				// Emit a message to warn the user that MakeConnections are not arriving to pick
+				// messages up
+				if(log.isWarnEnabled()) {
+					String messageType = Integer.toString(bean.getMessageType());
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, messageType);
+					log.warn(message);
+				}
+				
+				// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
+				bean.setTimeToSend(System.currentTimeMillis());
+				manager.getSenderBeanMgr().update(bean);
+			}
+	
+			if(tran != null && tran.isActive()) tran.commit();
+			tran = null;
+	
+		} catch(Exception e) {
+			// There isn't much we can do here, so log the exception and continue.
+			if(log.isDebugEnabled()) log.debug("Exception", e);
+		} finally {
+			if(tran != null && tran.isActive()) tran.rollback();
+		}
+		
+		if(log.isDebugEnabled()) log.debug("Exit: Sender::checkForOrphanMessages");
+	}
+}

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SequenceEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Accept.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Accept.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Accept.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Accept.java Wed Aug  8 17:57:51 2007
@@ -1,117 +1,117 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.wsrm;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axis2.AxisFault;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-
-/**
- * Represents the RM Accept element which may come within the 
- * Create Sequence Response.
- */
-
-public class Accept implements IOMRMElement {
-
-	private AcksTo acksTo;
-	
-	private String rmNamespaceValue;
-	
-	// Constructor used during parsing
-	public Accept(String rmNamespaceValue) throws SandeshaException {
-		if (!isNamespaceSupported(rmNamespaceValue))
-			throw new SandeshaException (SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.unknownNamespace,
-					rmNamespaceValue));
-		
-		this.rmNamespaceValue = rmNamespaceValue;
-	}
-	
-	// Constructor used during writing
-	public Accept(String rmNamespace, AcksTo acksTo) throws SandeshaException {
-		this(rmNamespace);
-		this.acksTo = acksTo;
-	}
-
-	public String getNamespaceValue(){
-		return rmNamespaceValue;
-	}
-	
-	public String getAddressingNamespaceValue() {
-		if(acksTo != null) return acksTo.getAddressingNamespaceValue();
-		return null;
-	}
-	
-	public Object fromOMElement(OMElement element) throws OMException,AxisFault {
-		
-		OMElement acceptPart = element.getFirstChildWithName(new QName(
-				rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.ACCEPT));
-		if (acceptPart == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.noAcceptPartInElement,
-					element.toString()));
-
-		acksTo = new AcksTo(rmNamespaceValue);
-		acksTo.fromOMElement(acceptPart);
-
-		return this;
-	}
-
-	public OMElement toOMElement(OMElement element) throws OMException,AxisFault {
-
-		OMFactory factory = element.getOMFactory();
-		
-		if (acksTo == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.acceptNullAcksTo));
-
-		OMNamespace rmNamespace = factory.createOMNamespace(rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
-		OMElement acceptElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.ACCEPT, rmNamespace);
-		
-		acksTo.toOMElement(acceptElement);
-		element.addChild(acceptElement);
-
-		return element;
-	}
-
-	public void setAcksTo(AcksTo acksTo) {
-		this.acksTo = acksTo;
-	}
-
-	public AcksTo getAcksTo() {
-		return acksTo;
-	}
-	
-	public boolean isNamespaceSupported (String namespaceName) {
-		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		return false;
-	}
-}
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axis2.AxisFault;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Represents the RM Accept element which may come within the 
+ * Create Sequence Response.
+ */
+
+public class Accept implements IOMRMElement {
+
+	private AcksTo acksTo;
+	
+	private String rmNamespaceValue;
+	
+	// Constructor used during parsing
+	public Accept(String rmNamespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(rmNamespaceValue))
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.unknownNamespace,
+					rmNamespaceValue));
+		
+		this.rmNamespaceValue = rmNamespaceValue;
+	}
+	
+	// Constructor used during writing
+	public Accept(String rmNamespace, AcksTo acksTo) throws SandeshaException {
+		this(rmNamespace);
+		this.acksTo = acksTo;
+	}
+
+	public String getNamespaceValue(){
+		return rmNamespaceValue;
+	}
+	
+	public String getAddressingNamespaceValue() {
+		if(acksTo != null) return acksTo.getAddressingNamespaceValue();
+		return null;
+	}
+	
+	public Object fromOMElement(OMElement element) throws OMException,AxisFault {
+		
+		OMElement acceptPart = element.getFirstChildWithName(new QName(
+				rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.ACCEPT));
+		if (acceptPart == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.noAcceptPartInElement,
+					element.toString()));
+
+		acksTo = new AcksTo(rmNamespaceValue);
+		acksTo.fromOMElement(acceptPart);
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement element) throws OMException,AxisFault {
+
+		OMFactory factory = element.getOMFactory();
+		
+		if (acksTo == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.acceptNullAcksTo));
+
+		OMNamespace rmNamespace = factory.createOMNamespace(rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+		OMElement acceptElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.ACCEPT, rmNamespace);
+		
+		acksTo.toOMElement(acceptElement);
+		element.addChild(acceptElement);
+
+		return element;
+	}
+
+	public void setAcksTo(AcksTo acksTo) {
+		this.acksTo = acksTo;
+	}
+
+	public AcksTo getAcksTo() {
+		return acksTo;
+	}
+	
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Accept.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckFinal.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckFinal.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckFinal.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckFinal.java Wed Aug  8 17:57:51 2007
@@ -1,86 +1,86 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.wsrm;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-
-/**
- * This represent the wsrm:final element that may be present withing a sequence acknowledgement.
- */
-public class AckFinal implements IOMRMElement {
-
-	private String namespaceValue = null;
-	
-	public AckFinal(String namespaceValue) throws SandeshaException {
-		if (!isNamespaceSupported(namespaceValue))
-			throw new SandeshaException (SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.unknownSpec,
-					namespaceValue));
-		
-		this.namespaceValue = namespaceValue;
-	}
-
-	public String getNamespaceValue(){
-		return namespaceValue;
-	}
-
-	public Object fromOMElement(OMElement element) throws OMException {
-		
-		OMElement finalPart = element.getFirstChildWithName(new QName(
-				namespaceValue, Sandesha2Constants.WSRM_COMMON.FINAL));
-		if (finalPart == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.noFinalPartInElement,
-					element.toString()));
-
-		return this;
-	}
-
-	public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
-		//soapheaderblock element will be given
-
-		OMFactory factory = sequenceAckElement.getOMFactory();
-		
-		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
-		OMElement finalElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.FINAL, rmNamespace);
-		sequenceAckElement.addChild(finalElement);
-
-		return sequenceAckElement;
-	}
-
-	
-	//this element is only supported in 2005_05_10 spec.
-	public boolean isNamespaceSupported (String namespaceName) {
-		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
-			return false;
-		
-		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		return false;
-	}
-}
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * This represent the wsrm:final element that may be present withing a sequence acknowledgement.
+ */
+public class AckFinal implements IOMRMElement {
+
+	private String namespaceValue = null;
+	
+	public AckFinal(String namespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(namespaceValue))
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.unknownSpec,
+					namespaceValue));
+		
+		this.namespaceValue = namespaceValue;
+	}
+
+	public String getNamespaceValue(){
+		return namespaceValue;
+	}
+
+	public Object fromOMElement(OMElement element) throws OMException {
+		
+		OMElement finalPart = element.getFirstChildWithName(new QName(
+				namespaceValue, Sandesha2Constants.WSRM_COMMON.FINAL));
+		if (finalPart == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.noFinalPartInElement,
+					element.toString()));
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
+		//soapheaderblock element will be given
+
+		OMFactory factory = sequenceAckElement.getOMFactory();
+		
+		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+		OMElement finalElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.FINAL, rmNamespace);
+		sequenceAckElement.addChild(finalElement);
+
+		return sequenceAckElement;
+	}
+
+	
+	//this element is only supported in 2005_05_10 spec.
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return false;
+		
+		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckFinal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckNone.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckNone.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckNone.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckNone.java Wed Aug  8 17:57:51 2007
@@ -1,85 +1,85 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.wsrm;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-
-
-/**
- * This represent the wsrm:none element that may be present withing a sequence acknowledgement.
- */
-public class AckNone implements IOMRMElement {
-
-	private String namespaceValue = null;
-	
-	public AckNone(String namespaceValue) throws SandeshaException {
-		if (!isNamespaceSupported(namespaceValue))
-			throw new SandeshaException (SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.unknownSpec,
-					namespaceValue));
-		
-		this.namespaceValue = namespaceValue;
-	}
-
-	public String getNamespaceValue(){
-		return namespaceValue;
-	}
-
-	public Object fromOMElement(OMElement element) throws OMException {
-	    
-		OMElement nonePart = element.getFirstChildWithName(new QName(
-				namespaceValue, Sandesha2Constants.WSRM_COMMON.NONE));
-		if (nonePart == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.noNonePartInElement,
-					element.toString()));
-
-		return this;
-	}
-
-	public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
-		
-		OMFactory factory = sequenceAckElement.getOMFactory();
-	    
-	    OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
-	    OMElement noneElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.NONE, rmNamespace);
-
-		sequenceAckElement.addChild(noneElement);
-		return sequenceAckElement;
-	}
-	
-	//this element is only supported in 2005_05_10 spec.
-	public boolean isNamespaceSupported (String namespaceName) {
-		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
-			return false;
-		
-		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		return false;
-	}
-}
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+
+/**
+ * This represent the wsrm:none element that may be present withing a sequence acknowledgement.
+ */
+public class AckNone implements IOMRMElement {
+
+	private String namespaceValue = null;
+	
+	public AckNone(String namespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(namespaceValue))
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.unknownSpec,
+					namespaceValue));
+		
+		this.namespaceValue = namespaceValue;
+	}
+
+	public String getNamespaceValue(){
+		return namespaceValue;
+	}
+
+	public Object fromOMElement(OMElement element) throws OMException {
+	    
+		OMElement nonePart = element.getFirstChildWithName(new QName(
+				namespaceValue, Sandesha2Constants.WSRM_COMMON.NONE));
+		if (nonePart == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.noNonePartInElement,
+					element.toString()));
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement sequenceAckElement) throws OMException {
+		
+		OMFactory factory = sequenceAckElement.getOMFactory();
+	    
+	    OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+	    OMElement noneElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.NONE, rmNamespace);
+
+		sequenceAckElement.addChild(noneElement);
+		return sequenceAckElement;
+	}
+	
+	//this element is only supported in 2005_05_10 spec.
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return false;
+		
+		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckNone.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckRequested.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckRequested.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckRequested.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckRequested.java Wed Aug  8 17:57:51 2007
@@ -1,150 +1,150 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.wsrm;
-
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.SOAPHeader;
-import org.apache.axiom.soap.SOAPHeaderBlock;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-
-/**
- * Represent the AckRequested header block.
- * The 2005/02 spec includes a 'MessageNumber' part in the ack request, but
- * the 2006/08 spec does not. As the message number was never used in our
- * implementation we simply ignore it.
- */
-
-public class AckRequested implements IOMRMPart {
-	
-	private Identifier identifier;
-	private String namespaceValue = null;
-	private boolean mustUnderstand = false;
-	
-	public AckRequested(String namespaceValue) throws SandeshaException {
-		if (!isNamespaceSupported(namespaceValue))
-			throw new SandeshaException (SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.unknownSpec,
-					namespaceValue));
-		
-		this.namespaceValue = namespaceValue;
-	}
-
-	public String getNamespaceValue() {
-		return namespaceValue;
-	}
-
-	public Object fromOMElement(OMElement ackReqElement) throws OMException,SandeshaException {
-
-		identifier = new Identifier(namespaceValue);
-		identifier.fromOMElement(ackReqElement);
-
-		// Indicate that we have processed this SOAPHeaderBlock
-		((SOAPHeaderBlock)ackReqElement).setProcessed();
-
-		return this;
-	}
-
-	public OMElement toOMElement(OMElement header) throws OMException {
-
-		if (header == null || !(header instanceof SOAPHeader))
-			throw new OMException(
-					SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.ackRequestedCannotBeAddedToNonHeader));
-
-		if (identifier == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.ackRequestNullID));
-		
-		OMFactory factory = header.getOMFactory();
-		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
-
-		Iterator iter = header.getChildrenWithName(new QName (namespaceValue,Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED));
-		while (iter.hasNext()) {
-			OMElement ackRequestedElement = (OMElement) iter.next();
-			
-			OMElement identifierElement = ackRequestedElement.getFirstChildWithName(new QName (namespaceValue,
-					Sandesha2Constants.WSRM_COMMON.IDENTIFIER));
-			String identifierVal = null;
-			if (identifierElement!=null)
-				identifierVal = identifierElement.getText();
-			
-			if (identifierVal!=null && 
-					(identifierVal.equals(identifier.getIdentifier()) || identifierVal.equals(Sandesha2Constants.TEMP_SEQUENCE_ID)))
-				ackRequestedElement.detach();
-			
-		}
-		
-		SOAPHeader SOAPHdr = (SOAPHeader) header;
-		SOAPHeaderBlock ackReqHdrBlock = SOAPHdr.addHeaderBlock(Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED, rmNamespace);
-		ackReqHdrBlock.setMustUnderstand(isMustUnderstand());
-
-		identifier.toOMElement(ackReqHdrBlock);
-
-		return header;
-	}
-
-	public void setIdentifier(Identifier identifier) {
-		this.identifier = identifier;
-	}
-
-	public Identifier getIdentifier() {
-		return identifier;
-	}
-
-	public void toSOAPEnvelope(SOAPEnvelope envelope) {
-		SOAPHeader header = envelope.getHeader();
-		
-		if (header==null) {
-			SOAPFactory factory = (SOAPFactory)envelope.getOMFactory();
-			header = factory.createSOAPHeader(envelope);
-		}
-		
-		toOMElement(header);
-	}
-	
-	public boolean isMustUnderstand() {
-		return mustUnderstand;
-	}
-
-	public void setMustUnderstand(boolean mustUnderstand) {
-		this.mustUnderstand = mustUnderstand;
-	}
-	
-	public boolean isNamespaceSupported (String namespaceName) {
-		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		return false;
-	}
-	
-}
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axiom.soap.SOAPHeaderBlock;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Represent the AckRequested header block.
+ * The 2005/02 spec includes a 'MessageNumber' part in the ack request, but
+ * the 2006/08 spec does not. As the message number was never used in our
+ * implementation we simply ignore it.
+ */
+
+public class AckRequested implements IOMRMPart {
+	
+	private Identifier identifier;
+	private String namespaceValue = null;
+	private boolean mustUnderstand = false;
+	
+	public AckRequested(String namespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(namespaceValue))
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.unknownSpec,
+					namespaceValue));
+		
+		this.namespaceValue = namespaceValue;
+	}
+
+	public String getNamespaceValue() {
+		return namespaceValue;
+	}
+
+	public Object fromOMElement(OMElement ackReqElement) throws OMException,SandeshaException {
+
+		identifier = new Identifier(namespaceValue);
+		identifier.fromOMElement(ackReqElement);
+
+		// Indicate that we have processed this SOAPHeaderBlock
+		((SOAPHeaderBlock)ackReqElement).setProcessed();
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement header) throws OMException {
+
+		if (header == null || !(header instanceof SOAPHeader))
+			throw new OMException(
+					SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.ackRequestedCannotBeAddedToNonHeader));
+
+		if (identifier == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.ackRequestNullID));
+		
+		OMFactory factory = header.getOMFactory();
+		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+
+		Iterator iter = header.getChildrenWithName(new QName (namespaceValue,Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED));
+		while (iter.hasNext()) {
+			OMElement ackRequestedElement = (OMElement) iter.next();
+			
+			OMElement identifierElement = ackRequestedElement.getFirstChildWithName(new QName (namespaceValue,
+					Sandesha2Constants.WSRM_COMMON.IDENTIFIER));
+			String identifierVal = null;
+			if (identifierElement!=null)
+				identifierVal = identifierElement.getText();
+			
+			if (identifierVal!=null && 
+					(identifierVal.equals(identifier.getIdentifier()) || identifierVal.equals(Sandesha2Constants.TEMP_SEQUENCE_ID)))
+				ackRequestedElement.detach();
+			
+		}
+		
+		SOAPHeader SOAPHdr = (SOAPHeader) header;
+		SOAPHeaderBlock ackReqHdrBlock = SOAPHdr.addHeaderBlock(Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED, rmNamespace);
+		ackReqHdrBlock.setMustUnderstand(isMustUnderstand());
+
+		identifier.toOMElement(ackReqHdrBlock);
+
+		return header;
+	}
+
+	public void setIdentifier(Identifier identifier) {
+		this.identifier = identifier;
+	}
+
+	public Identifier getIdentifier() {
+		return identifier;
+	}
+
+	public void toSOAPEnvelope(SOAPEnvelope envelope) {
+		SOAPHeader header = envelope.getHeader();
+		
+		if (header==null) {
+			SOAPFactory factory = (SOAPFactory)envelope.getOMFactory();
+			header = factory.createSOAPHeader(envelope);
+		}
+		
+		toOMElement(header);
+	}
+	
+	public boolean isMustUnderstand() {
+		return mustUnderstand;
+	}
+
+	public void setMustUnderstand(boolean mustUnderstand) {
+		this.mustUnderstand = mustUnderstand;
+	}
+	
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+	
+}

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AckRequested.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AcknowledgementRange.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AcknowledgementRange.java?view=diff&rev=564063&r1=564062&r2=564063
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AcknowledgementRange.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AcknowledgementRange.java Wed Aug  8 17:57:51 2007
@@ -1,144 +1,144 @@
-/*
- * Copyright  1999-2004 The Apache Software Foundation.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.sandesha2.wsrm;
-
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMException;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-
-/**
- * Represents and AcknowledgementRange element.
- */
-
-public class AcknowledgementRange implements IOMRMElement {
-	
-	private long upperValue;
-	
-	private long lowerValue;
-	
-	private String namespaceValue = null;
-	
-	public AcknowledgementRange(String namespaceValue) throws SandeshaException {
-		if (!isNamespaceSupported(namespaceValue))
-			throw new SandeshaException (SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.unknownSpec,
-					namespaceValue));
-		
-		this.namespaceValue = namespaceValue;
-	}
-
-	public String getNamespaceValue() {
-		return namespaceValue;
-	}
-
-	public Object fromOMElement(OMElement ackRangePart) throws OMException {
-
-		if (ackRangePart == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.nullPassedElement));
-
-		OMAttribute lowerAttrib = ackRangePart.getAttribute(new QName(
-				Sandesha2Constants.WSRM_COMMON.LOWER));
-		OMAttribute upperAttrib = ackRangePart.getAttribute(new QName(
-				Sandesha2Constants.WSRM_COMMON.UPPER));
-
-		if (lowerAttrib == null || upperAttrib == null)
-			throw new OMException(
-					SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.noUpperOrLowerAttributesInElement,
-							ackRangePart.toString()));
-
-		try {
-			long lower = Long.parseLong(lowerAttrib.getAttributeValue());
-			long upper = Long.parseLong(upperAttrib.getAttributeValue());
-			upperValue = upper;
-			lowerValue = lower;
-		} catch (Exception ex) {
-			throw new OMException(
-					SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.ackRandDoesNotHaveCorrectValues,
-							ackRangePart.toString()));
-		}
-
-		return this;
-	}
-
-	public OMElement toOMElement(OMElement sequenceAckElement)
-			throws OMException {
-
-		if (sequenceAckElement == null)
-			throw new OMException(SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.cannotSetAckRangeNullElement));
-
-		if (upperValue <= 0 || lowerValue <= 0 || lowerValue > upperValue)
-			throw new OMException(
-					SandeshaMessageHelper.getMessage(
-							SandeshaMessageKeys.ackRandDoesNotHaveCorrectValues,
-							upperValue + ":" + lowerValue));
-
-		OMFactory factory = sequenceAckElement.getOMFactory();
-		
-		OMAttribute lowerAttrib = factory.createOMAttribute(
-				Sandesha2Constants.WSRM_COMMON.LOWER, null, Long.toString(lowerValue));
-		OMAttribute upperAttrib = factory.createOMAttribute(
-				Sandesha2Constants.WSRM_COMMON.UPPER, null, Long.toString(upperValue));
-
-		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
-		OMElement acknowledgementRangeElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.ACK_RANGE, rmNamespace);
-		
-		acknowledgementRangeElement.addAttribute(lowerAttrib);
-		acknowledgementRangeElement.addAttribute(upperAttrib);
-		sequenceAckElement.addChild(acknowledgementRangeElement);
-
-		return sequenceAckElement;
-	}
-
-	public long getLowerValue() {
-		return lowerValue;
-	}
-
-	public void setLowerValue(long lowerValue) {
-		this.lowerValue = lowerValue;
-	}
-
-	public long getUpperValue() {
-		return upperValue;
-	}
-
-	public void setUpperValue(long upperValue) {
-		this.upperValue = upperValue;
-	}
-	
-	public boolean isNamespaceSupported (String namespaceName) {
-		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
-			return true;
-		
-		return false;
-	}
-}
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.wsrm;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Represents and AcknowledgementRange element.
+ */
+
+public class AcknowledgementRange implements IOMRMElement {
+	
+	private long upperValue;
+	
+	private long lowerValue;
+	
+	private String namespaceValue = null;
+	
+	public AcknowledgementRange(String namespaceValue) throws SandeshaException {
+		if (!isNamespaceSupported(namespaceValue))
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.unknownSpec,
+					namespaceValue));
+		
+		this.namespaceValue = namespaceValue;
+	}
+
+	public String getNamespaceValue() {
+		return namespaceValue;
+	}
+
+	public Object fromOMElement(OMElement ackRangePart) throws OMException {
+
+		if (ackRangePart == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.nullPassedElement));
+
+		OMAttribute lowerAttrib = ackRangePart.getAttribute(new QName(
+				Sandesha2Constants.WSRM_COMMON.LOWER));
+		OMAttribute upperAttrib = ackRangePart.getAttribute(new QName(
+				Sandesha2Constants.WSRM_COMMON.UPPER));
+
+		if (lowerAttrib == null || upperAttrib == null)
+			throw new OMException(
+					SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.noUpperOrLowerAttributesInElement,
+							ackRangePart.toString()));
+
+		try {
+			long lower = Long.parseLong(lowerAttrib.getAttributeValue());
+			long upper = Long.parseLong(upperAttrib.getAttributeValue());
+			upperValue = upper;
+			lowerValue = lower;
+		} catch (Exception ex) {
+			throw new OMException(
+					SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.ackRandDoesNotHaveCorrectValues,
+							ackRangePart.toString()));
+		}
+
+		return this;
+	}
+
+	public OMElement toOMElement(OMElement sequenceAckElement)
+			throws OMException {
+
+		if (sequenceAckElement == null)
+			throw new OMException(SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.cannotSetAckRangeNullElement));
+
+		if (upperValue <= 0 || lowerValue <= 0 || lowerValue > upperValue)
+			throw new OMException(
+					SandeshaMessageHelper.getMessage(
+							SandeshaMessageKeys.ackRandDoesNotHaveCorrectValues,
+							upperValue + ":" + lowerValue));
+
+		OMFactory factory = sequenceAckElement.getOMFactory();
+		
+		OMAttribute lowerAttrib = factory.createOMAttribute(
+				Sandesha2Constants.WSRM_COMMON.LOWER, null, Long.toString(lowerValue));
+		OMAttribute upperAttrib = factory.createOMAttribute(
+				Sandesha2Constants.WSRM_COMMON.UPPER, null, Long.toString(upperValue));
+
+		OMNamespace rmNamespace = factory.createOMNamespace(namespaceValue,Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+		OMElement acknowledgementRangeElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.ACK_RANGE, rmNamespace);
+		
+		acknowledgementRangeElement.addAttribute(lowerAttrib);
+		acknowledgementRangeElement.addAttribute(upperAttrib);
+		sequenceAckElement.addChild(acknowledgementRangeElement);
+
+		return sequenceAckElement;
+	}
+
+	public long getLowerValue() {
+		return lowerValue;
+	}
+
+	public void setLowerValue(long lowerValue) {
+		this.lowerValue = lowerValue;
+	}
+
+	public long getUpperValue() {
+		return upperValue;
+	}
+
+	public void setUpperValue(long upperValue) {
+		this.upperValue = upperValue;
+	}
+	
+	public boolean isNamespaceSupported (String namespaceName) {
+		if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(namespaceName))
+			return true;
+		
+		return false;
+	}
+}

Propchange: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/AcknowledgementRange.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org