You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC
svn commit: r531400 [7/18] - in /webservices/sandesha/trunk/java/modules:
client/ core/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/sandesha2/ core/src/main/java/org/...
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,165 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.storage.inmemory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beans.RMBean;
+import org.apache.sandesha2.workers.SandeshaThread;
+
+/**
+ * This class does not really implement transactions, but it is a good
+ * place to implement locking for the in memory storage manager.
+ */
+
+public class InMemoryTransaction implements Transaction {
+
+ private static final Log log = LogFactory.getLog(InMemoryTransaction.class);
+
+ private InMemoryStorageManager manager;
+ private String threadName;
+ private int threadId;
+ private ArrayList enlistedBeans = new ArrayList();
+ private InMemoryTransaction waitingForTran = null;
+ private boolean sentMessages = false;
+ private boolean receivedMessages = false;
+
+ InMemoryTransaction(InMemoryStorageManager manager, String threadName, int id) {
+ if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::<init>");
+ this.manager = manager;
+ this.threadName = threadName;
+ this.threadId = id;
+ if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::<init>, " + this);
+ }
+
+ public void commit() {
+ releaseLocks();
+ if(sentMessages) manager.getSender().wakeThread();
+ if(receivedMessages) {
+ SandeshaThread invoker = manager.getInvoker();
+ if(invoker != null) invoker.wakeThread();
+ }
+ }
+
+ public void rollback() {
+ releaseLocks();
+ }
+
+ public boolean isActive () {
+ return !enlistedBeans.isEmpty();
+ }
+
+ public void enlist(RMBean bean) throws SandeshaStorageException {
+ if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::enlist, " + bean);
+ if(bean != null) {
+ synchronized (bean) {
+ InMemoryTransaction other = (InMemoryTransaction) bean.getTransaction();
+ while(other != null && other != this) {
+ // Put ourselves into the list of waiters
+ waitingForTran = other;
+
+ // Look to see if there is a loop in the chain of waiters
+ if(!enlistedBeans.isEmpty()) {
+ HashSet set = new HashSet();
+ set.add(this);
+ while(other != null) {
+ if(set.contains(other)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
+ SandeshaStorageException e = new SandeshaStorageException(message);
+
+ // Do our best to get out of the way of the other work in the system
+ waitingForTran = null;
+ releaseLocks();
+
+ if(log.isDebugEnabled()) log.debug(message, e);
+ throw e;
+ }
+ set.add(other);
+ other = other.waitingForTran;
+ }
+ }
+
+ try {
+ if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + waitingForTran);
+ bean.wait();
+ } catch(InterruptedException e) {
+ // Do nothing
+ }
+ other = (InMemoryTransaction) bean.getTransaction();
+ }
+
+ waitingForTran = null;
+ if(other == null) {
+ if(log.isDebugEnabled()) log.debug(this + " locking bean");
+ bean.setTransaction(this);
+ enlistedBeans.add(bean);
+ }
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::enlist");
+ }
+
+ private void releaseLocks() {
+ if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::releaseLocks, " + this);
+ manager.removeTransaction(this);
+
+ Iterator beans = enlistedBeans.iterator();
+ while(beans.hasNext()) {
+ RMBean bean = (RMBean) beans.next();
+ synchronized (bean) {
+ bean.setTransaction(null);
+ bean.notify();
+ }
+ }
+ enlistedBeans.clear();
+
+ if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::releaseLocks");
+ }
+
+ public String toString() {
+ StringBuffer result = new StringBuffer();
+ result.append("[InMemoryTransaction, id:");
+ result.append(threadId);
+ result.append(", name: ");
+ result.append(threadName);
+ result.append(", locks: ");
+ result.append(enlistedBeans.size());
+ result.append("]");
+ return result.toString();
+ }
+
+ public void setReceivedMessages(boolean receivedMessages) {
+ this.receivedMessages = receivedMessages;
+ }
+
+ public void setSentMessages(boolean sentMessages) {
+ this.sentMessages = sentMessages;
+ }
+}
+
+
+
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportOutDesc.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.transport;
+
+import org.apache.axis2.description.TransportOutDescription;
+
+public class Sandesha2TransportOutDesc extends TransportOutDescription {
+
+ public Sandesha2TransportOutDesc() {
+ super ("Sandesha2TransportOutDesc");
+ this.setSender(new Sandesha2TransportSender ());
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/transport/Sandesha2TransportSender.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.transport;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.HandlerDescription;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+public class Sandesha2TransportSender implements TransportSender {
+
+ private static final Log log = LogFactory.getLog(Sandesha2TransportSender.class);
+
+ public void cleanup(MessageContext msgContext) throws AxisFault {
+
+ }
+
+ public void stop() {
+
+ }
+
+ public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sandesha2TransportSender::invoke, " + msgContext.getEnvelope().getHeader());
+
+ //setting the correct transport sender.
+ TransportOutDescription transportOut = (TransportOutDescription) msgContext.getProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC);
+
+ if (transportOut==null)
+ throw new SandeshaException (SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.transportOutNotPresent));
+
+ msgContext.setTransportOut(transportOut);
+
+ String key = (String) msgContext.getProperty(Sandesha2Constants.MESSAGE_STORE_KEY);
+
+ if (key==null)
+ throw new SandeshaException (SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.cannotGetStorageKey));
+
+ ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+ AxisConfiguration axisConfiguration = configurationContext.getAxisConfiguration();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,axisConfiguration);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,Sandesha2Constants.VALUE_TRUE);
+
+ storageManager.storeMessageContext(key,msgContext);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sandesha2TransportSender::invoke");
+ return InvocationResponse.CONTINUE;
+ }
+
+ //Below methods are not used
+ public void cleanUp(MessageContext msgContext) throws AxisFault {}
+
+ public void init(ConfigurationContext confContext, TransportOutDescription transportOut) throws AxisFault {}
+
+ public void cleanup() {}
+
+ public HandlerDescription getHandlerDesc() {return null;}
+
+ public String getName() { return null;}
+
+ public Parameter getParameter(String name) { return null; }
+
+ public void init(HandlerDescription handlerdesc) {}
+
+ public void flowComplete(MessageContext msgContext) {}
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,324 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * Contains logic for managing acknowledgements.
+ */
+
+public class AcknowledgementManager {
+
+ private static Log log = LogFactory.getLog(AcknowledgementManager.class);
+
+ /**
+ * Piggybacks any available acks of the same sequence to the given
+ * application message.
+ *
+ * @param applicationRMMsgContext
+ * @throws SandeshaException
+ */
+ public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager)
+ throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+ // If this message is going to an anonymous address, and the inbound sequence has
+ // anonymous acksTo, then we add in an ack for the inbound sequence.
+ EndpointReference target = rmMessageContext.getTo();
+ if(target == null || target.hasAnonymousAddress()) {
+ if(target != null && SandeshaUtil.isWSRMAnonymous(target.getAddress())) {
+ // Search for any sequences that have an acksTo that matches this target, and add an ack
+ // for each of them.
+ RMDBean findBean = new RMDBean();
+ findBean.setAcksToEPR(target.getAddress());
+ findBean.setTerminated(false);
+ Collection rmdBeans = storageManager.getRMDBeanMgr().find(findBean);
+ Iterator sequences = rmdBeans.iterator();
+ while(sequences.hasNext()) {
+ RMDBean sequence = (RMDBean) sequences.next();
+ if (sequence.getHighestInMessageNumber() > 0) {
+ if(log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequence.getSequenceID());
+ RMMsgCreator.addAckMessage(rmMessageContext, sequence.getSequenceID(), sequence);
+ }
+ }
+
+ } else {
+ // We have no good indicator of the identity of the destination, so the only sequence
+ // we can ack is the inbound one that caused us to create this response.
+ String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+ if(inboundSequence != null) {
+ RMDBean inboundBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
+ if(inboundBean != null && !inboundBean.isTerminated()) {
+ String acksTo = inboundBean.getAcksToEPR();
+ EndpointReference acksToEPR = new EndpointReference(acksTo);
+
+ if(acksTo == null || acksToEPR.hasAnonymousAddress()) {
+ if(log.isDebugEnabled()) log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
+ RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean);
+ }
+ }
+ }
+ }
+ if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
+ return;
+ }
+
+ // From here on, we must be dealing with a real address. Piggyback all sequences that have an
+ // acksTo that matches the To address, and that have an ackMessage queued up for sending.
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ findBean.setSend(true);
+ findBean.setToAddress(target.getAddress());
+
+ Collection collection = retransmitterBeanMgr.find(findBean);
+ Iterator it = collection.iterator();
+ while (it.hasNext()) {
+ SenderBean ackBean = (SenderBean) it.next();
+
+ // Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
+ long timeNow = System.currentTimeMillis();
+ if (ackBean.getTimeToSend() > timeNow) {
+ // Delete the beans that would have sent the ack
+ retransmitterBeanMgr.delete(ackBean.getMessageID());
+ storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
+
+ String sequenceId = ackBean.getSequenceID();
+ if (log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
+
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ if(rmdBean != null && !rmdBean.isTerminated()) {
+ RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean);
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
+ }
+
+ /**
+ *
+ * @param referenceRMMessage
+ * @param sequencePropertyKey
+ * @param sequenceId
+ * @param storageManager
+ * @param makeResponse Some work will be done to make the new ack message the response of the reference message.
+ * @return
+ * @throws AxisFault
+ */
+ public static RMMsgContext generateAckMessage(
+
+ RMMsgContext referenceRMMessage,
+ RMDBean rmdBean,
+ String sequenceId,
+ StorageManager storageManager,
+ boolean serverSide
+
+ ) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::generateAckMessage " + rmdBean);
+
+ MessageContext referenceMsg = referenceRMMessage.getMessageContext();
+
+ EndpointReference acksTo = new EndpointReference(rmdBean.getAcksToEPR());
+
+ if (acksTo.getAddress() == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
+
+ AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.ACK,
+ referenceRMMessage.getRMSpecVersion(),
+ referenceMsg.getAxisService());
+
+ MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+ ackRMMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
+
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(referenceMsg.getEnvelope()));
+
+ // Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+
+ ackMsgCtx.setEnvelope(envelope);
+
+ ackMsgCtx.setTo(acksTo);
+
+ ackMsgCtx.setServerSide(serverSide);
+
+ // adding the SequenceAcknowledgement part.
+ RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::generateAckMessage");
+ return ackRMMsgCtx;
+ }
+
+
+
+
+ public static boolean verifySequenceCompletion(RangeString ackRanges, long lastMessageNo) {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
+
+ boolean result = false;
+ Range complete = new Range(1, lastMessageNo);
+ if(ackRanges.isRangeCompleted(complete)) {
+ result = true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::verifySequenceCompletion " + result);
+ return result;
+ }
+
+ public static void addAckBeanEntry (
+ RMMsgContext ackRMMsgContext,
+ String sequenceId,
+ long timeToSend,
+ StorageManager storageManager) throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::addAckBeanEntry");
+
+ // Write the acks into the envelope
+ ackRMMsgContext.addSOAPEnvelope();
+
+ MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean ackBean = new SenderBean();
+ ackBean.setMessageContextRefKey(key);
+ ackBean.setMessageID(ackMsgContext.getMessageID());
+ ackBean.setReSend(false);
+ ackBean.setSequenceID(sequenceId);
+ EndpointReference to = ackMsgContext.getTo();
+ if (to!=null)
+ ackBean.setToAddress(to.getAddress());
+
+ ackBean.setSend(true);
+ ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ // removing old acks.
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ findBean.setSend(true);
+ findBean.setReSend(false);
+ findBean.setSequenceID(sequenceId);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+
+ while(it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ if(oldAckBean.getTimeToSend() < timeToSend)
+ timeToSend = oldAckBean.getTimeToSend();
+
+ // removing the retransmitted entry for the oldAck
+ retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+
+ // removing the message store entry for the old ack
+ storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
+ }
+
+ ackBean.setTimeToSend(timeToSend);
+
+ ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ // passing the message through sandesha2sender
+ ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+
+ SandeshaUtil.executeAndStore(ackRMMsgContext, key);
+
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
+ if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::addAckBeanEntry");
+ }
+
+ public static void sendAckNow (RMMsgContext ackRMMsgContext) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementManager::sendAckNow");
+
+ // Write the acks into the envelope
+ ackRMMsgContext.addSOAPEnvelope();
+
+ MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
+ ConfigurationContext configContext = ackMsgContext.getConfigurationContext();
+
+ // setting CONTEXT_WRITTEN since acksto is anonymous
+ if (ackRMMsgContext.getMessageContext().getOperationContext() == null) {
+ // operation context will be null when doing in a GLOBAL
+ // handler.
+ AxisOperation op = ackMsgContext.getAxisOperation();
+
+ OperationContext opCtx = ContextFactory.createOperationContext(op, ackRMMsgContext.getMessageContext().getServiceContext());
+ ackRMMsgContext.getMessageContext().setOperationContext(opCtx);
+ }
+
+ ackRMMsgContext.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+
+ ackRMMsgContext.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
+
+ ackRMMsgContext.getMessageContext().setServerSide(true);
+
+ AxisEngine engine = new AxisEngine(configContext);
+ engine.send(ackMsgContext);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementManager::sendAckNow");
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,818 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPFault;
+import org.apache.axiom.soap.SOAPFaultCode;
+import org.apache.axiom.soap.SOAPFaultDetail;
+import org.apache.axiom.soap.SOAPFaultReason;
+import org.apache.axiom.soap.SOAPFaultSubCode;
+import org.apache.axiom.soap.SOAPFaultText;
+import org.apache.axiom.soap.SOAPFaultValue;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.util.CallbackReceiver;
+import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.FaultData;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.SequenceFault;
+
+/**
+ * Has logic to check for possible RM related faults and create it.
+ */
+
+public class FaultManager {
+
+ private static final Log log = LogFactory.getLog(FaultManager.class);
+
+ /**
+ * Check weather the LastMessage number has been exceeded and generate the
+ * fault if it is.
+ *
+ * @param msgCtx
+ * @return
+ */
+ public static void checkForLastMsgNumberExceeded(RMMsgContext applicationRMMessage, StorageManager storageManager)
+ throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::checkForLastMsgNumberExceeded");
+/*
+ * TODO - This code currently doesn't actually work
+ Sequence sequence = (Sequence) applicationRMMessage.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ long messageNumber = sequence.getMessageNumber().getMessageNumber();
+ String sequenceID = sequence.getIdentifier().getIdentifier();
+
+ boolean lastMessageNumberExceeded = false;
+ String reason = null;
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+ if (rmsBean != null) {
+ long lastMessageNo = rmsBean.getLastOutMessage();
+ if (messageNumber > lastMessageNo) {
+ lastMessageNumberExceeded = true;
+ reason = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberLargerThanLastMsg, Long
+ .toString(messageNumber), Long.toString(lastMessageNo));
+ }
+ }
+
+ if (lastMessageNumberExceeded) {
+ FaultData faultData = new FaultData();
+ faultData.setType(Sandesha2Constants.SOAPFaults.FaultType.LAST_MESSAGE_NO_EXCEEDED);
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(applicationRMMessage.getSOAPEnvelope());
+ if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+ faultData.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ else
+ faultData.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+ faultData.setSubcode(Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED);
+ faultData.setReason(reason);
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+ String rmNamespace = applicationRMMessage.getRMNamespaceValue();
+ OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+ rmNamespace, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ identifierElement.setText(sequenceID);
+
+ faultData.setDetail(identifierElement);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded, lastMessageNumberExceeded");
+ getFault(applicationRMMessage, faultData, storageManager);
+ }
+*/
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded");
+ }
+
+ public static RMMsgContext checkForMessageNumberRoleover(MessageContext messageContext) {
+ return null;
+ }
+
+ /**
+ * Check whether a Sequence message (a) belongs to a unknown sequence
+ * (generates an UnknownSequence fault) (b) message number exceeds a
+ * predifined limit ( genenrates a Message Number Rollover fault)
+ *
+ * @param msgCtx
+ * @return true if no exception has been thrown and the sequence doesn't exist
+ * @throws SandeshaException
+ */
+ public static boolean checkForUnknownSequence(RMMsgContext rmMessageContext, String sequenceID,
+ StorageManager storageManager) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::checkForUnknownSequence, " + sequenceID);
+
+ boolean validSequence = false;
+
+ // Look for an outbound sequence
+ if (SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID) != null) {
+ validSequence = true;
+ // Look for an inbound sequence
+ } else if(SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID) != null) {
+ validSequence = true;
+ }
+
+ if (!validSequence) {
+
+ if (log.isDebugEnabled())
+ log.debug("Sequence not valid " + sequenceID);
+
+ // Return an UnknownSequence error
+ MessageContext messageContext = rmMessageContext.getMessageContext();
+
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(messageContext.getEnvelope());
+
+ FaultData data = new FaultData();
+ if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+ data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ else
+ data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+ data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(),
+ Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE ));
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+
+ OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+ rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ identifierElement.setText(sequenceID);
+
+ data.setDetail(identifierElement);
+
+ data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSequenceFault, sequenceID));
+
+ data.setType(Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForUnknownSequence, Sequence unknown");
+ getOrSendFault(rmMessageContext, data);
+ return true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForUnknownSequence");
+ return false;
+ }
+
+ /**
+ * Check weather the Acknowledgement is invalid and generate a fault if it
+ * is.
+ *
+ * @param msgCtx
+ * @return
+ * @throws SandeshaException
+ */
+ public static boolean checkForInvalidAcknowledgement(RMMsgContext ackRMMessageContext, SequenceAcknowledgement sequenceAcknowledgement,
+ StorageManager storageManager, RMSBean rmsBean)
+ throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::checkForInvalidAcknowledgement");
+
+ // check lower<=upper
+ if (ackRMMessageContext.getMessageType() != Sandesha2Constants.MessageTypes.ACK) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForInvalidAcknowledgement, MessageType not an ACK");
+ }
+
+ boolean invalidAck = false;
+
+ List sequenceAckList = sequenceAcknowledgement.getAcknowledgementRanges();
+ Iterator it = sequenceAckList.iterator();
+
+ while (it.hasNext()) {
+ AcknowledgementRange acknowledgementRange = (AcknowledgementRange) it.next();
+ long upper = acknowledgementRange.getUpperValue();
+ long lower = acknowledgementRange.getLowerValue();
+
+ if (lower > upper) {
+ invalidAck = true;
+ // check upper isn't bigger than the highest out msg number
+ } else if ( upper > rmsBean.getHighestOutMessageNumber() ) {
+ invalidAck = true;
+ }
+
+ if (invalidAck) {
+ makeInvalidAcknowledgementFault(ackRMMessageContext, sequenceAcknowledgement,
+ acknowledgementRange, storageManager);
+ return true;
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForInvalidAcknowledgement");
+ return false;
+ }
+
+ /**
+ * Makes an InvalidAcknowledgement fault.
+ * @param rmMsgCtx
+ * @param storageManager
+ * @param message
+ * @throws AxisFault
+ */
+ public static void makeInvalidAcknowledgementFault(RMMsgContext rmMsgCtx,
+ SequenceAcknowledgement sequenceAcknowledgement, AcknowledgementRange acknowledgementRange,
+ StorageManager storageManager) throws AxisFault {
+ FaultData data = new FaultData();
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(rmMsgCtx.getMessageContext().getEnvelope());
+ if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+ data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ else
+ data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+ data.setType(Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT);
+ data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMsgCtx.getRMNamespaceValue(),
+ Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT ));
+ data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckFault));
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+
+ OMElement seqAckElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK,
+ rmMsgCtx.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+
+ // Set the sequence Id
+ sequenceAcknowledgement.getIdentifier().toOMElement(seqAckElement);
+
+ // Set the Ack Range
+ acknowledgementRange.toOMElement(seqAckElement);
+
+ data.setDetail(seqAckElement);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForInvalidAcknowledgement, invalid ACK");
+ getOrSendFault(rmMsgCtx, data);
+ }
+
+ /**
+ * Makes a Create sequence refused fault
+ */
+ public static void makeCreateSequenceRefusedFault(RMMsgContext rmMessageContext,
+ String detail,
+ Exception e)
+
+ throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::makeCreateSequenceRefusedFault, " + detail);
+
+ // Return a CreateSequenceRefused error
+ MessageContext messageContext = rmMessageContext.getMessageContext();
+
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(messageContext.getEnvelope());
+
+ FaultData data = new FaultData();
+ if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+ data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ else
+ data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+ data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(),
+ Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED ));
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+ OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+ rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ identifierElement.setText(detail);
+ data.setDetail(identifierElement);
+ data.setDetailString(detail);
+
+ data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
+
+ data.setType(Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED);
+
+ data.setExceptionString(SandeshaUtil.getStackTraceFromException(e));
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::makeCreateSequenceRefusedFault");
+ getOrSendFault(rmMessageContext, data);
+ }
+
+ /**
+ * Checks if a sequence is terminated and returns a SequenceTerminated fault.
+ * @param referenceRMMessage
+ * @param sequenceID
+ * @param rmdBean
+ * @return
+ * @throws AxisFault
+ */
+ public static boolean checkForSequenceTerminated(RMMsgContext referenceRMMessage, String sequenceID, RMSequenceBean bean)
+
+ throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
+
+ if (bean.isTerminated()) {
+ MessageContext referenceMessage = referenceRMMessage.getMessageContext();
+ FaultData data = new FaultData();
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(referenceMessage.getEnvelope());
+ if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+ data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ else
+ data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+ data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(),
+ Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED ));
+ data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceTerminatedFault, sequenceID));
+ data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED);
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+ String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
+ OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+ rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ identifierElement.setText(sequenceID);
+
+ data.setDetail(identifierElement);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForSequenceClosed, sequence closed");
+ getOrSendFault(referenceRMMessage, data);
+ return true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForSequenceClosed");
+ return false;
+ }
+
+ public static boolean checkForSequenceClosed(RMMsgContext referenceRMMessage, String sequenceID,
+ RMDBean rmdBean) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
+
+ if (rmdBean.isClosed()) {
+ MessageContext referenceMessage = referenceRMMessage.getMessageContext();
+ FaultData data = new FaultData();
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(referenceMessage.getEnvelope());
+ if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
+ data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ else
+ data.setCode(SOAP12Constants.FAULT_CODE_SENDER);
+
+ data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(),
+ Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED ));
+ data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotAcceptMsgAsSequenceClosedFault));
+ data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED);
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+ String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
+ OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+ rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ identifierElement.setText(sequenceID);
+
+ data.setDetail(identifierElement);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForSequenceClosed, sequence closed");
+ getOrSendFault(referenceRMMessage, data);
+ return true;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::checkForSequenceClosed");
+ return false;
+ }
+
+ /**
+ * Adds the necessary Fault elements as properties to the message context.
+ * Or if this is a SOAP11 Fault, generates the correct RM Fault and sends.
+ *
+ * @param referenceRMMsgContext - Message in reference to which the fault will be generated.
+ * @param data - data for the fault
+ * @return - The dummy fault to be thrown out.
+ *
+ * @throws AxisFault
+ */
+ public static void getOrSendFault(RMMsgContext referenceRMMsgContext, FaultData data) throws AxisFault {
+ SOAPFactory factory = (SOAPFactory) referenceRMMsgContext.getSOAPEnvelope().getOMFactory();
+
+ SOAPFaultCode faultCode = factory.createSOAPFaultCode();
+ SOAPFaultSubCode faultSubCode = factory.createSOAPFaultSubCode(faultCode);
+
+ SOAPFaultValue faultColdValue = factory.createSOAPFaultValue(faultCode);
+ SOAPFaultValue faultSubcodeValue = factory.createSOAPFaultValue(faultSubCode);
+
+ faultColdValue.setText(data.getCode());
+
+ faultSubcodeValue.setText(data.getSubcode());
+
+ faultCode.setSubCode(faultSubCode);
+
+ SOAPFaultReason reason = factory.createSOAPFaultReason();
+ SOAPFaultText reasonText = factory.createSOAPFaultText();
+ reasonText.setText(data.getReason());
+
+ SOAPFaultDetail detail = factory.createSOAPFaultDetail();
+ detail.addDetailEntry(data.getDetail());
+
+ String SOAPNamespaceValue = factory.getSoapVersionURI();
+
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
+ reason.addSOAPText(reasonText);
+ referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
+ referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason);
+ referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
+ } else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals (SOAPNamespaceValue)) {
+ reason.setText(data.getReason());
+ referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
+ referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
+ referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason);
+ // Need to send this message as the Axis Layer doesn't set the "SequenceFault" header
+ MessageContext faultMessageContext =
+ MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(), null);
+
+ SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext, Sandesha2Constants.SOAPVersion.v1_1, data, referenceRMMsgContext.getRMNamespaceValue());
+
+ referenceRMMsgContext.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+
+ // Set the action
+ faultMessageContext.setWSAAction(
+ SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
+
+ if (log.isDebugEnabled())
+ log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
+ // Send the message
+ AxisEngine engine = new AxisEngine(faultMessageContext.getConfigurationContext());
+ engine.sendFault(faultMessageContext);
+
+ return;
+ } else {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSoapVersion);
+ throw new SandeshaException (message);
+ }
+ AxisFault fault = new AxisFault(faultColdValue.getTextAsQName(), data.getReason(), "", "", data.getDetail());
+ fault.setFaultAction(SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
+ throw fault;
+
+ }
+
+ public static boolean isRMFault (String faultSubcodeValue) {
+ if (faultSubcodeValue==null)
+ return false;
+
+ if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equalsIgnoreCase (faultSubcodeValue) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.INVALID_ACKNOWLEDGEMENT.equalsIgnoreCase (faultSubcodeValue) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED.equalsIgnoreCase (faultSubcodeValue) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equalsIgnoreCase (faultSubcodeValue) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_CLOSED.equalsIgnoreCase (faultSubcodeValue) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equalsIgnoreCase (faultSubcodeValue) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equalsIgnoreCase (faultSubcodeValue) ) {
+
+ return true;
+ }
+
+ return false;
+
+ }
+
+ private static void manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault {
+
+ if (log.isErrorEnabled())
+ log.error(fault);
+
+ SandeshaListener listner = (SandeshaListener) rmMsgCtx.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+ if (listner!=null)
+ listner.onError(fault);
+
+ // Get the SOAPVersion
+ SOAPFactory factory = (SOAPFactory) rmMsgCtx.getSOAPEnvelope().getOMFactory();
+ String SOAPNamespaceValue = factory.getSoapVersionURI();
+
+ String soapFaultSubcode = null;
+ String identifier = null;
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
+ // Log the fault
+ if (faultPart.getCode() != null &&
+ faultPart.getCode().getSubCode() != null &&
+ faultPart.getCode().getSubCode().getValue() != null)
+
+ soapFaultSubcode = faultPart.getCode().getSubCode().getValue().getTextAsQName().getLocalPart();
+
+ // Get the identifier, if there is one.
+ SOAPFaultDetail detail = faultPart.getDetail();
+ if (detail != null)
+ {
+ OMElement identifierOM = detail.getFirstChildWithName(new QName(rmMsgCtx.getRMNamespaceValue(),
+ Sandesha2Constants.WSRM_COMMON.IDENTIFIER));
+ if (identifierOM != null)
+ identifier = identifierOM.getText();
+ }
+
+ } else {
+ // Need to get the sequence part from the Header.
+ try {
+ SequenceFault sequenceFault = (SequenceFault)rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE_FAULT);
+
+ // If the sequence fault part is not null, then we have an RM specific fault.
+ if (sequenceFault != null) {
+ soapFaultSubcode = sequenceFault.getFaultCode().getFaultCode().getLocalPart();
+
+ // Get the identifier - if there is one.
+ identifier = sequenceFault.getFaultCode().getDetail();
+ }
+ } catch (SandeshaException e) {
+ if (log.isDebugEnabled())
+ log.debug("Unable to process SequenceFault", e);
+ }
+ }
+
+ if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equals(soapFaultSubcode)) {
+ processCreateSequenceRefusedFault(rmMsgCtx, fault);
+ } else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) ||
+ Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) ) {
+ processSequenceUnknownFault(rmMsgCtx, fault, identifier);
+ }
+ }
+
+ public static void processMessagesForFaults (RMMsgContext rmMsgCtx) throws AxisFault {
+
+ SOAPEnvelope envelope = rmMsgCtx.getSOAPEnvelope();
+ if (envelope==null)
+ return;
+
+ SOAPFault faultPart = envelope.getBody().getFault();
+
+ if (faultPart != null) {
+
+ // constructing the fault
+ AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart);
+ manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+ }
+
+ }
+
+
+ private static AxisFault getAxisFaultFromFromSOAPFault(SOAPFault faultPart) {
+ AxisFault axisFault = new AxisFault(faultPart.getCode(), faultPart.getReason(), faultPart.getNode(), faultPart
+ .getRole(), faultPart.getDetail());
+
+ return axisFault;
+ }
+
+ /**
+ * Checks to see if the message number received is == to the Long.MAX_VALUE
+ *
+ * Throws and AxisFault, or sends a Fault message if the condition is met.
+ * @throws AxisFault
+ */
+ public static boolean checkForMessageRolledOver(RMMsgContext rmMessageContext, String sequenceId, long msgNo)
+
+ throws AxisFault {
+ if (msgNo == Long.MAX_VALUE) {
+ if (log.isDebugEnabled())
+ log.debug("Max message number reached " + msgNo);
+ // Return a CreateSequenceRefused error
+ MessageContext messageContext = rmMessageContext.getMessageContext();
+
+ int SOAPVersion = SandeshaUtil.getSOAPVersion(messageContext.getEnvelope());
+
+ FaultData data = new FaultData();
+ data.setCode(SOAP11Constants.FAULT_CODE_SENDER);
+ data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(),
+ Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER ));
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
+ OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
+ rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ identifierElement.setText(sequenceId);
+
+ OMElement maxMsgNumber = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.MAX_MSG_NUMBER,
+ rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
+ maxMsgNumber.setText(Long.toString(msgNo));
+
+ data.setDetail(identifierElement);
+ data.setDetail2(maxMsgNumber);
+
+ data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.messageNumberRollover));
+
+ data.setType(Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER);
+
+ getOrSendFault(rmMessageContext, data);
+
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * On receipt of a CreateSequenceRefused fault, terminate the sequence and notify any waiting
+ * clients of the error.
+ * @param fault
+ * @throws AxisFault
+ */
+ private static void processCreateSequenceRefusedFault(RMMsgContext rmMsgCtx, AxisFault fault) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::processCreateSequenceRefusedFault");
+
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+
+ RelatesTo relatesTo = rmMsgCtx.getMessageContext().getRelatesTo();
+ String createSeqMsgId = null;
+ if (relatesTo != null) {
+ createSeqMsgId = relatesTo.getValue();
+ } else {
+ // Work out the related message from the operation context
+ OperationContext context = rmMsgCtx.getMessageContext().getOperationContext();
+ MessageContext createSeq = context.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+ if(createSeq != null) createSeqMsgId = createSeq.getMessageID();
+ }
+ if(createSeqMsgId == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
+ log.error(message);
+ throw new SandeshaException(message);
+ }
+
+ SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+ RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+
+ RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId);
+ if (rmsBean == null) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Unable to find RMSBean");
+ return;
+ }
+
+ /* if (rmsBean.getLastSendError() == null) {
+ // Indicate that there was an error when sending the Create Sequence.
+ rmsBean.setLastSendError(fault);
+
+ // Update the RMSBean
+ rmsBeanMgr.update(rmsBean);
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Allowing another CreateSequence attempt");
+ return;
+ }
+*/
+ SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+ if (createSequenceSenderBean == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
+
+ // deleting the create sequence entry.
+ retransmitterMgr.delete(createSeqMsgId);
+
+ // Notify the clients of a failure
+ notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+
+ rmMsgCtx.pause();
+
+ // Cleanup sending side.
+ if (log.isDebugEnabled())
+ log.debug("Terminating sending sequence " + rmsBean);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::processCreateSequenceRefusedFault");
+ }
+
+ /**
+ * If the RMD returns a SequenceTerminated, or an Unknown sequence fault, then we should
+ * mark the RMS Sequence as terminated and notify clients of the error.
+ *
+ * @param rmMsgCtx
+ * @param fault
+ * @param identifier
+ */
+ private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID);
+
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+
+ // Find the rmsBean
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
+ if (rmsBean != null) {
+
+ // Notify the clients of a failure
+ notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+
+ rmMsgCtx.pause();
+
+ // Cleanup sending side.
+ if (log.isDebugEnabled())
+ log.debug("Terminating sending sequence " + rmsBean);
+ TerminateManager.terminateSendingSide(rmsBean, storageManager);
+
+ // Update the last activated time.
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
+
+ // Update the bean in the map
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ }
+ else {
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
+ if (rmdBean != null) {
+ rmMsgCtx.pause();
+
+ // Cleanup sending side.
+ if (log.isDebugEnabled())
+ log.debug("Terminating sending sequence " + rmdBean);
+ TerminateManager.cleanReceivingSideOnTerminateMessage(configCtx, rmdBean.getSequenceID(), storageManager);
+
+ // Update the last activated time.
+ rmdBean.setLastActivatedTime(System.currentTimeMillis());
+
+ // Update the bean in the map
+ storageManager.getRMDBeanMgr().update(rmdBean);
+
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::processSequenceUnknownFault Unable to find sequence");
+ return;
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: FaultManager::processSequenceUnknownFault");
+ }
+
+ static void notifyClientsOfFault(String internalSequenceId,
+ StorageManager storageManager, ConfigurationContext configCtx, AxisFault fault) throws SandeshaStorageException {
+ // Locate and update all of the messages for this sequence, now that we know
+ // the sequence id.
+ SenderBean target = new SenderBean();
+ target.setInternalSequenceID(internalSequenceId);
+
+ Iterator iterator = storageManager.getSenderBeanMgr().find(target).iterator();
+ while (iterator.hasNext()) {
+ SenderBean tempBean = (SenderBean) iterator.next();
+
+ String messageStoreKey = tempBean.getMessageContextRefKey();
+
+ // Retrieve the message context.
+ MessageContext context = storageManager.retrieveMessageContext(messageStoreKey, configCtx);
+
+ AxisOperation axisOperation = context.getAxisOperation();
+ if (axisOperation != null)
+ {
+ MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
+ if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver))
+ {
+ Callback callback = ((CallbackReceiver)msgReceiver).lookupCallback(context.getMessageID());
+ if (callback != null)
+ {
+ callback.onError(fault);
+ }
+ }
+ }
+ }
+
+ }
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,128 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+/**
+ * This is used to adjust retransmission infoamation after each time the message
+ * is sent.
+ */
+
+public class MessageRetransmissionAdjuster {
+
+ private static final Log log = LogFactory.getLog(MessageRetransmissionAdjuster.class);
+
+ public static boolean adjustRetransmittion(RMMsgContext rmMsgCtx, SenderBean retransmitterBean, ConfigurationContext configContext,
+ StorageManager storageManager) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: MessageRetransmissionAdjuster::adjustRetransmittion");
+
+ String internalSequenceID = retransmitterBean.getInternalSequenceID();
+ String sequenceID = retransmitterBean.getSequenceID();
+
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, sequenceID);
+
+ // operation is the lowest level Sandesha2 could be attached.
+ SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation());
+
+ retransmitterBean.setSentCount(retransmitterBean.getSentCount() + 1);
+ adjustNextRetransmissionTime(retransmitterBean, propertyBean);
+
+ int maxRetransmissionAttempts = propertyBean.getMaximumRetransmissionCount();
+
+ // We can only time out sequences if we can identify the correct sequence, and
+ // we need the internal sequence id for that.
+ boolean continueSending = true;
+ if(internalSequenceID != null) {
+ boolean timeOutSequence = false;
+ if (maxRetransmissionAttempts >= 0 && retransmitterBean.getSentCount() > maxRetransmissionAttempts)
+ timeOutSequence = true;
+
+ if (timeOutSequence) {
+
+ retransmitterBean.setSend(false);
+
+ // Warn the user that the sequence has timed out
+ //if (log.isWarnEnabled())
+ // log.warn();
+
+ // Only messages of outgoing sequences get retransmitted. So named
+ // following method according to that.
+
+ SequenceManager.finalizeTimedOutSequence(internalSequenceID, rmMsgCtx.getMessageContext(), storageManager);
+ continueSending = false;
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: MessageRetransmissionAdjuster::adjustRetransmittion, " + continueSending);
+ return continueSending;
+ }
+
+ /**
+ * This sets the next time the message has to be retransmitted. This uses
+ * the base retransmission interval and exponentialBackoff properties to
+ * calculate the correct time.
+ *
+ * @param retransmitterBean
+ * @param policyBean
+ * @return
+ */
+ private static SenderBean adjustNextRetransmissionTime(SenderBean retransmitterBean, SandeshaPolicyBean propertyBean) throws SandeshaException {
+
+ int count = retransmitterBean.getSentCount();
+
+ long baseInterval = propertyBean.getRetransmissionInterval();
+
+ long newInterval = baseInterval;
+ if (propertyBean.isExponentialBackoff()) {
+ newInterval = generateNextExponentialBackedoffDifference(count, baseInterval);
+ }
+
+ long newTimeToSend = 0;
+
+ long timeNow = System.currentTimeMillis();
+ newTimeToSend = timeNow + newInterval;
+
+ retransmitterBean.setTimeToSend(newTimeToSend);
+
+ return retransmitterBean;
+ }
+
+ private static long generateNextExponentialBackedoffDifference(int count, long initialInterval) {
+ long interval = initialInterval;
+ for (int i = 1; i < count; i++) {
+ interval = interval * 2;
+ }
+
+ return interval;
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/MsgInitializer.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,253 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.context.MessageContext;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.CloseSequenceResponse;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.MakeConnection;
+import org.apache.sandesha2.wsrm.RMElements;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.TerminateSequenceResponse;
+
+/**
+ * This class is used to create an RMMessageContext out of an MessageContext.
+ */
+
+public class MsgInitializer {
+
+ /**
+ * Called to create a rmMessageContext out of an message context. Finds out
+ * things like rm version and message type as well.
+ *
+ * @param ctx
+ * @param assumedRMNamespace
+ * this is used for validation (to find out weather the
+ * rmNamespace of the current message is equal to the regietered
+ * rmNamespace of the sequence). If null validation will not
+ * happen.
+ *
+ * @return
+ * @throws SandeshaException
+ */
+ public static RMMsgContext initializeMessage(MessageContext ctx) throws AxisFault {
+ RMMsgContext rmMsgCtx = new RMMsgContext(ctx);
+
+ populateRMMsgContext(ctx, rmMsgCtx);
+ validateMessage(rmMsgCtx);
+ return rmMsgCtx;
+ }
+
+ /**
+ * Adds the message parts the the RMMessageContext.
+ *
+ * @param msgCtx
+ * @param rmMsgContext
+ */
+ private static void populateRMMsgContext(MessageContext msgCtx, RMMsgContext rmMsgContext) throws AxisFault {
+
+ // if client side and the addressing version is not set. assuming the
+ // default addressing version
+ String addressingNamespace = (String) msgCtx.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+ if (addressingNamespace == null && !msgCtx.isServerSide())
+ addressingNamespace = AddressingConstants.Final.WSA_NAMESPACE;
+
+ RMElements elements = new RMElements(addressingNamespace);
+ elements.fromSOAPEnvelope(msgCtx.getEnvelope(), msgCtx.getWSAAction());
+
+ String rmNamespace = null;
+
+ if (elements.getCreateSequence() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ, elements.getCreateSequence());
+ rmNamespace = elements.getCreateSequence().getNamespaceValue();
+ }
+
+ if (elements.getCreateSequenceResponse() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE, elements
+ .getCreateSequenceResponse());
+ rmNamespace = elements.getCreateSequenceResponse().getNamespaceValue();
+ }
+
+ if (elements.getSequence() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, elements.getSequence());
+ rmNamespace = elements.getSequence().getNamespaceValue();
+ }
+
+ //In case of ack messages RM Namespace is decided based on the sequenceId of the last
+ //sequence Ack. In other words Sandesha2 does not expect to receive two SequenceAcknowledgements
+ //of different RM specifications in the same incoming message
+ for (Iterator iter = elements.getSequenceAcknowledgements();iter.hasNext();) {
+ SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) iter.next();
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, sequenceAck);
+ rmNamespace = sequenceAck.getNamespaceValue();
+ }
+
+ if (elements.getTerminateSequence() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ, elements.getTerminateSequence());
+ rmNamespace = elements.getTerminateSequence().getNamespaceValue();
+ }
+
+ if (elements.getTerminateSequenceResponse() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE, elements
+ .getTerminateSequenceResponse());
+ rmNamespace = elements.getTerminateSequenceResponse().getNamespaceValue();
+ }
+
+ //In case of ack request messages RM Namespace is decided based on the sequenceId of the last
+ //ack request.
+ for (Iterator iter = elements.getAckRequests();iter.hasNext();) {
+ AckRequested ackRequest = (AckRequested) iter.next();
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST, ackRequest);
+ rmNamespace = ackRequest.getNamespaceValue();
+ }
+
+ if (elements.getCloseSequence() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE, elements.getCloseSequence());
+ rmNamespace = elements.getCloseSequence().getNamespaceValue();
+ }
+
+ if (elements.getCloseSequenceResponse() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE_RESPONSE, elements
+ .getCloseSequenceResponse());
+ rmNamespace = elements.getCloseSequenceResponse().getNamespaceValue();
+ }
+
+ if (elements.getUsesSequenceSTR() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.USES_SEQUENCE_STR, elements
+ .getUsesSequenceSTR());
+ }
+
+ if (elements.getMakeConnection() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION,
+ elements.getMakeConnection());
+ }
+
+ if (elements.getMessagePending() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING,
+ elements.getMessagePending());
+ }
+
+ if (elements.getSequenceFault() != null) {
+ rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE_FAULT,
+ elements.getSequenceFault());
+ }
+
+ rmMsgContext.setRMNamespaceValue(rmNamespace);
+
+ }
+
+ /**
+ * This is used to validate the message. Also set an Message type. Possible
+ * types are given in the Sandesha2Constants.MessageTypes interface.
+ *
+ * @param rmMsgCtx
+ * @return
+ * @throws SandeshaException
+ */
+ private static boolean validateMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ String sequenceID = null;
+
+ CreateSequence createSequence = (CreateSequence) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.CREATE_SEQ);
+ CreateSequenceResponse createSequenceResponse = (CreateSequenceResponse) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+ TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ TerminateSequenceResponse terminateSequenceResponse = (TerminateSequenceResponse) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
+ Iterator sequenceAcknowledgementsIter = rmMsgCtx.getMessageParts(
+ Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.SEQUENCE);
+ Iterator ackRequestedIter = rmMsgCtx.getMessageParts(
+ Sandesha2Constants.MessageParts.ACK_REQUEST);
+ CloseSequence closeSequence = (CloseSequence) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+ CloseSequenceResponse closeSequenceResponse = (CloseSequenceResponse) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.CLOSE_SEQUENCE_RESPONSE);
+ MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(
+ Sandesha2Constants.MessageParts.MAKE_CONNECTION);
+
+ // Setting message type.
+ if (createSequence != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+ } else if (createSequenceResponse != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE);
+ sequenceID = createSequenceResponse.getIdentifier().getIdentifier();
+ } else if (terminateSequence != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
+ sequenceID = terminateSequence.getIdentifier().getIdentifier();
+ } else if (terminateSequenceResponse != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE);
+ sequenceID = terminateSequenceResponse.getIdentifier().getIdentifier();
+ } else if (rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE) != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ sequenceID = sequence.getIdentifier().getIdentifier();
+ } else if (sequenceAcknowledgementsIter.hasNext()) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAcknowledgementsIter.next();
+
+ //if there is only on sequenceAck, sequenceId will be set. Otherwise it will not be.
+ if (!sequenceAcknowledgementsIter.hasNext())
+ sequenceID = sequenceAcknowledgement.getIdentifier().getIdentifier();
+ } else if (ackRequestedIter.hasNext()) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
+ AckRequested ackRequest = (AckRequested) ackRequestedIter.next();
+
+ //if there is only on sequenceAck, sequenceId will be set. Otherwise it will not be.
+ if (!ackRequestedIter.hasNext())
+ sequenceID = ackRequest.getIdentifier().getIdentifier();
+ } else if (closeSequence != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE);
+ sequenceID = closeSequence.getIdentifier().getIdentifier();
+ } else if (closeSequenceResponse != null) {
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE_RESPONSE);
+ sequenceID = closeSequenceResponse.getIdentifier().getIdentifier();
+ } else if (makeConnection != null){
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
+ if (makeConnection.getIdentifier()!=null) {
+ sequenceID = makeConnection.getIdentifier().getIdentifier();
+ } else if (makeConnection.getAddress()!=null){
+ //TODO get sequenceId based on the anonymous address.
+ } else {
+ throw new SandeshaException (
+ "Invalid MakeConnection message. Either Address or Identifier must be present");
+ }
+ } else
+ rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.UNKNOWN);
+
+ if (sequenceID!=null)
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceID);
+
+ return true;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org