You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/06/15 07:51:24 UTC
svn commit: r414476 [6/15] - in /webservices/sandesha/trunk: ./ c/ config/
interop/ java/ java/config/ java/interop/ java/interop/conf/
java/interop/src/ java/interop/src/org/ java/interop/src/org/apache/
java/interop/src/org/apache/sandesha2/ java/int...
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,301 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CreateSequenceResponse;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing an incoming Create Sequence Response message.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class CreateSeqResponseMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx)
+ throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(createSeqResponseRMMsgCtx.getSOAPEnvelope()));
+
+ MessageContext createSeqResponseMsg = createSeqResponseRMMsgCtx.getMessageContext();
+ ConfigurationContext configCtx = createSeqResponseRMMsgCtx
+ .getMessageContext().getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx,configCtx.getAxisConfiguration());
+
+ //Processing for ack if available
+
+ SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) createSeqResponseRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ if (sequenceAck != null) {
+ AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
+ ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
+ }
+
+ //Processing the create sequence response.
+
+ CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+ if (createSeqResponsePart == null) {
+ String message = "Create Sequence Response part is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String newOutSequenceId = createSeqResponsePart.getIdentifier()
+ .getIdentifier();
+ if (newOutSequenceId == null) {
+ String message = "New sequence Id is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ RelatesTo relatesTo = createSeqResponseRMMsgCtx.getMessageContext()
+ .getRelatesTo();
+ if (relatesTo==null) {
+ String message = "Invalid create sequence message. RelatesTo part is not available";
+ log.error("Invalid create sequence response message. RelatesTo part is not available");
+ throw new SandeshaException ("Invalid create sequence message. RelatesTo part is not available");
+ }
+ String createSeqMsgId = relatesTo.getValue();
+
+
+ SenderBeanMgr retransmitterMgr = storageManager
+ .getRetransmitterBeanMgr();
+ CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
+
+ CreateSeqBean createSeqBean = createSeqMgr.retrieve(createSeqMsgId);
+ if (createSeqBean == null) {
+ String message = "Create Sequence entry is not found";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String internalSequenceId = createSeqBean.getInternalSequenceID();
+ if (internalSequenceId == null || "".equals(internalSequenceId)) {
+ String message = "TempSequenceId has is not set";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ createSeqBean.setSequenceID(newOutSequenceId);
+ createSeqMgr.update(createSeqBean);
+
+ SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
+ if (createSequenceSenderBean==null)
+ throw new SandeshaException ("Create sequence entry is not found");
+
+ //removing the Create Sequence Message from the storage
+ String createSeqStorageKey = createSequenceSenderBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(createSeqStorageKey);
+
+ //deleting the create sequence entry.
+ retransmitterMgr.delete(createSeqMsgId);
+
+ //storing new out sequence id
+ SequencePropertyBeanMgr sequencePropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+ SequencePropertyBean outSequenceBean = new SequencePropertyBean(
+ internalSequenceId, Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,
+ newOutSequenceId);
+ SequencePropertyBean internalSequenceBean = new SequencePropertyBean(
+ newOutSequenceId,
+ Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, internalSequenceId);
+
+
+ sequencePropMgr.insert(outSequenceBean);
+ sequencePropMgr.insert(internalSequenceBean);
+
+ //processing for accept (offer has been sent)
+ Accept accept = createSeqResponsePart.getAccept();
+ if (accept != null) {
+ //Find offered sequence from internal sequence id.
+ SequencePropertyBean offeredSequenceBean = sequencePropMgr
+ .retrieve(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
+
+ //TODO this should be detected in the Fault manager.
+ if (offeredSequenceBean == null) {
+ String message = "No offered sequence entry. But an accept was received";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String offeredSequenceId = (String) offeredSequenceBean.getValue();
+
+ EndpointReference acksToEPR = accept.getAcksTo().getAddress()
+ .getEpr();
+ SequencePropertyBean acksToBean = new SequencePropertyBean();
+ acksToBean.setName(Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+ acksToBean.setSequenceID(offeredSequenceId);
+ acksToBean.setValue(acksToEPR.getAddress());
+
+ sequencePropMgr.insert(acksToBean);
+
+ NextMsgBean nextMsgBean = new NextMsgBean();
+ nextMsgBean.setSequenceID(offeredSequenceId);
+ nextMsgBean.setNextMsgNoToProcess(1);
+
+ NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr();
+ nextMsgMgr.insert(nextMsgBean);
+
+ String rmSpecVersion = createSeqResponseRMMsgCtx.getRMSpecVersion();
+
+ SequencePropertyBean specVersionBean = new SequencePropertyBean (
+ offeredSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION,rmSpecVersion);
+ sequencePropMgr.insert(specVersionBean);
+
+ SequencePropertyBean receivedMsgBean = new SequencePropertyBean(
+ offeredSequenceId, Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES, "");
+ sequencePropMgr.insert(receivedMsgBean);
+
+ SequencePropertyBean msgsBean = new SequencePropertyBean();
+ msgsBean.setSequenceID(offeredSequenceId);
+ msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
+ msgsBean.setValue("");
+ sequencePropMgr.insert(msgsBean);
+
+
+ //setting the addressing version.
+ String addressingNamespace = createSeqResponseRMMsgCtx.getAddressingNamespaceValue();
+ SequencePropertyBean addressingVersionBean = new SequencePropertyBean (
+ offeredSequenceId,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,addressingNamespace);
+ sequencePropMgr.insert(addressingVersionBean);
+
+ }
+
+ SenderBean target = new SenderBean();
+ target.setInternalSequenceID(internalSequenceId);
+ target.setSend(false);
+ target.setReSend(true);
+
+ Iterator iterator = retransmitterMgr.find(target).iterator();
+ while (iterator.hasNext()) {
+ SenderBean tempBean = (SenderBean) iterator.next();
+
+ //updating the application message
+ String key = tempBean.getMessageContextRefKey();
+ MessageContext applicationMsg = storageManager.retrieveMessageContext(key,configCtx);
+
+ //TODO make following exception message more understandable to the user (probably some others exceptions messages as well)
+ if (applicationMsg==null)
+ throw new SandeshaException ("Unavailable application message");
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,storageManager);
+ if (rmVersion==null)
+ throw new SandeshaException ("Cant find the rmVersion of the given message");
+
+ String assumedRMNamespace = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+
+ RMMsgContext applicaionRMMsg = MsgInitializer
+ .initializeMessage(applicationMsg);
+
+ Sequence sequencePart = (Sequence) applicaionRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequencePart == null) {
+ String message = "Sequence part is null";
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ Identifier identifier = new Identifier(factory,assumedRMNamespace);
+ identifier.setIndentifer(newOutSequenceId);
+
+ sequencePart.setIdentifier(identifier);
+
+ AckRequested ackRequestedPart = (AckRequested) applicaionRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
+ if (ackRequestedPart != null) {
+ Identifier id1 = new Identifier(factory,assumedRMNamespace);
+ id1.setIndentifer(newOutSequenceId);
+ ackRequestedPart.setIdentifier(id1);
+ }
+
+ try {
+ applicaionRMMsg.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ //asking to send the application msssage
+ tempBean.setSend(true);
+ retransmitterMgr.update(tempBean);
+
+ //updating the message. this will correct the SOAP envelope string.
+ storageManager.updateMessageContext(key,applicationMsg);
+ }
+
+ SequenceManager.updateLastActivatedTime(internalSequenceId,storageManager);
+
+ createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
+ .setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
+ "false");
+
+ createSeqResponseRMMsgCtx.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
+ }
+
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,33 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.SandeshaException;
+
+/**
+ * The message processor interface.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * @author Sanka Samaranayaka <ss...@gmail.com>
+ */
+
+public interface MsgProcessor {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException;
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException;
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+
+/**
+ * Used to get a suitable message processor. Given the message type.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class MsgProcessorFactory {
+
+ public static MsgProcessor getMessageProcessor(RMMsgContext rmMessageContext) {
+
+ int messageType = rmMessageContext.getMessageType();
+
+ switch (messageType) {
+ case (Sandesha2Constants.MessageTypes.CREATE_SEQ):
+ return new CreateSeqMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.TERMINATE_SEQ):
+ return new TerminateSeqMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE):
+ return new TerminateSeqResponseMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.APPLICATION):
+ return new ApplicationMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE):
+ return new CreateSeqResponseMsgProcessor();
+ case (Sandesha2Constants.MessageTypes.ACK):
+ return new AcknowledgementProcessor();
+ case (Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE):
+ return new CloseSequenceProcessor ();
+ case (Sandesha2Constants.MessageTypes.ACK_REQUEST):
+ return new AckRequestedProcessor ();
+ default:
+ return null;
+ }
+ }
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,415 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.OutInAxisOperation;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.Sequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+
+/**
+ * Responsible for processing an incoming Terminate Sequence message.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+
+public class TerminateSeqMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext terminateSeqRMMsg)
+ throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
+
+ MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+ //Processing for ack if any
+ SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) terminateSeqRMMsg
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ if (sequenceAck != null) {
+ AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
+ ackProcessor.processInMessage(terminateSeqRMMsg);
+ }
+
+ //Processing the terminate message
+ //TODO Add terminate sequence message logic.
+ TerminateSequence terminateSequence = (TerminateSequence) terminateSeqRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ if (terminateSequence==null) {
+ String message = "Terminate Sequence part is not available";
+ log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ String sequenceId = terminateSequence.getIdentifier().getIdentifier();
+ if (sequenceId==null || "".equals(sequenceId)) {
+ String message = "Invalid sequence id";
+ log.debug(message);
+ throw new SandeshaException (message);
+ }
+
+ ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
+
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(terminateSeqRMMsg,sequenceId,storageManager);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = terminateSeqMsg.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ terminateSeqMsg.pause();
+ return;
+ }
+
+
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean terminateReceivedBean = new SequencePropertyBean ();
+ terminateReceivedBean.setSequenceID(sequenceId);
+ terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
+ terminateReceivedBean.setValue("true");
+
+ sequencePropertyBeanMgr.insert(terminateReceivedBean);
+
+ //add the terminate sequence response if required.
+ if (SpecSpecificConstants.isTerminateSequenceResponseRequired (terminateSeqRMMsg.getRMSpecVersion()))
+ addTerminateSequenceResponse (terminateSeqRMMsg,sequenceId,storageManager);
+
+ setUpHighestMsgNumbers(context,storageManager,sequenceId,terminateSeqRMMsg);
+
+ TerminateManager.cleanReceivingSideOnTerminateMessage(context,sequenceId,storageManager);
+
+
+ SequencePropertyBean terminatedBean = new SequencePropertyBean (
+ sequenceId,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
+
+ sequencePropertyBeanMgr.insert(terminatedBean);
+
+
+ //removing an entry from the listener
+ String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart();
+
+ SequenceManager.updateLastActivatedTime(sequenceId,storageManager);
+
+ terminateSeqMsg.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
+ }
+
+ private void setUpHighestMsgNumbers (ConfigurationContext configCtx, StorageManager storageManager, String sequenceID, RMMsgContext terminateRMMsg) throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::setUpHighestMsgNumbers, "+sequenceID);
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ String highestImMsgNumberStr = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,storageManager);
+ String highestImMsgKey = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,storageManager);
+
+ long highestInMsgNo = 0;
+ if (highestImMsgNumberStr!=null) {
+ if (highestImMsgKey==null)
+ throw new SandeshaException ("Key of the highest message number has not been stored");
+
+ highestInMsgNo = Long.parseLong(highestImMsgNumberStr);
+ }
+
+ //following will be valid only for the server side, since the obtained int. seq ID is only valid there.
+ String responseSideInternalSequenceID = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceID);
+
+ long highestOutMsgNo = 0;
+ try {
+ boolean addResponseSideTerminate = false;
+ if (highestInMsgNo==0) {
+ addResponseSideTerminate=false;
+ } else {
+
+ //setting the last in message property
+ SequencePropertyBean lastInMsgBean = new SequencePropertyBean (
+ sequenceID,Sandesha2Constants.SequenceProperties.LAST_IN_MESSAGE_NO,highestImMsgNumberStr);
+ seqPropMgr.insert(lastInMsgBean);
+
+ MessageContext highestInMsg = storageManager.retrieveMessageContext(highestImMsgKey,configCtx);
+
+ //TODO get the out message in a storage friendly manner.
+ MessageContext highestOutMessage = highestInMsg.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
+
+ if (highestOutMessage!=null) {
+ RMMsgContext highestOutRMMsg = MsgInitializer.initializeMessage(highestOutMessage);
+ Sequence seqPartOfOutMsg = (Sequence) highestOutRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+
+ if (seqPartOfOutMsg!=null) {
+
+ //response message of the last in message can be considered as the last out message.
+ highestOutMsgNo = seqPartOfOutMsg.getMessageNumber().getMessageNumber();
+ SequencePropertyBean highestOutMsgBean = new SequencePropertyBean (
+ responseSideInternalSequenceID,
+ Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,
+ new Long(highestOutMsgNo).toString() );
+
+ seqPropMgr.insert(highestOutMsgBean);
+ addResponseSideTerminate = true;
+ }
+ }
+ }
+
+ // If all the out message have been acked, add the outgoing terminate seq msg.
+ String outgoingSqunceID = SandeshaUtil.getSequenceProperty(responseSideInternalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,storageManager);
+ if (addResponseSideTerminate && highestOutMsgNo>0
+ && responseSideInternalSequenceID!=null && outgoingSqunceID!=null ) {
+ boolean allAcked = SandeshaUtil.isAllMsgsAckedUpto (highestOutMsgNo, responseSideInternalSequenceID, storageManager);
+
+ if (allAcked)
+ TerminateManager.addTerminateSequenceMessage(terminateRMMsg, outgoingSqunceID,responseSideInternalSequenceID,storageManager);
+ }
+ } catch (AxisFault e) {
+ throw new SandeshaException (e);
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
+ }
+
+ private void addTerminateSequenceResponse (RMMsgContext terminateSeqRMMsg, String sequenceID, StorageManager storageManager) throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceID);
+
+ MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
+ ConfigurationContext configCtx = terminateSeqMsg.getConfigurationContext();
+
+ MessageContext outMessage = null;
+ outMessage = Utils.createOutMessageContext(terminateSeqMsg);
+
+ RMMsgContext terminateSeqResponseRMMsg = RMMsgCreator
+ .createTerminateSeqResponseMsg(terminateSeqRMMsg, outMessage,storageManager);
+
+ RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg,sequenceID,storageManager);
+ SequenceAcknowledgement seqAck = (SequenceAcknowledgement) ackRMMessage.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,seqAck);
+
+ terminateSeqResponseRMMsg.addSOAPEnvelope();
+
+
+ terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+ terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ outMessage.setResponseWritten(true);
+
+ AxisEngine engine = new AxisEngine (terminateSeqMsg.getConfigurationContext());
+
+ EndpointReference toEPR = terminateSeqMsg.getTo();
+
+ try {
+ engine.send(outMessage);
+ } catch (AxisFault e) {
+ String message = "Could not send the terminate sequence response";
+ throw new SandeshaException (message,e);
+ }
+
+ String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,storageManager);
+ String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
+
+ if (anonymousURI.equals(
+ toEPR.getAddress())) {
+ terminateSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+ } else {
+ terminateSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::addTerminateSequenceResponse");
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+ Options options = msgContext.getOptions();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ String toAddress = rmMsgCtx.getTo().getAddress();
+ String sequenceKey = (String) options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ String internalSeqenceID = SandeshaUtil.getInternalSequenceID(toAddress,sequenceKey);
+
+ String outSequenceID = SandeshaUtil.getSequenceProperty(internalSeqenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,storageManager);
+ if (outSequenceID==null)
+ throw new SandeshaException ("SequenceID was not found. Cannot send the terminate message");
+
+/// Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+
+ String terminated = SandeshaUtil.getSequenceProperty(outSequenceID,
+ Sandesha2Constants.SequenceProperties.TERMINATE_ADDED,storageManager);
+
+
+ //registring an InOutOperationContext for this.
+ //since the serviceContext.fireAndForget only sets a inOnly One
+ //this does not work when there is a terminateSequnceResponse
+ //TODO do processing of terminateMessagesCorrectly., create a new message instead of sendign the one given by the serviceClient
+ //TODO important
+ try {
+ AxisOperation oldOPeration = msgContext.getAxisOperation();
+ AxisOperation outInAxisOp = new OutInAxisOperation (new QName ("temp"));
+ //setting flows
+ outInAxisOp.setRemainingPhasesInFlow(oldOPeration.getRemainingPhasesInFlow());
+
+ OperationContext opcontext = OperationContextFactory.createOperationContext(OperationContextFactory.MEP_CONSTANT_OUT_IN,outInAxisOp);
+ opcontext.setParent(msgContext.getServiceContext());
+ configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
+ } catch (AxisFault e1) {
+ throw new SandeshaException ("Could not register an outInAxisOperation");
+ }
+
+ if (terminated != null
+ && "true".equals(terminated)) {
+ String message = "Terminate was added previously.";
+ log.debug(message);
+ return;
+ }
+
+ TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
+
+ rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+ msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ rmMsgCtx.setTo(new EndpointReference(toAddress));
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSeqenceID,storageManager);
+ if (rmVersion==null)
+ throw new SandeshaException ("Cant find the rmVersion of the given message");
+
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ String transportTo = SandeshaUtil.getSequenceProperty(internalSeqenceID,Sandesha2Constants.SequenceProperties.TRANSPORT_TO,storageManager);
+ if (transportTo!=null) {
+ rmMsgCtx.setProperty(MessageContextConstants.TRANSPORT_URL,transportTo);
+ }
+
+ try {
+ rmMsgCtx.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setMessageContextRefKey(key);
+
+
+ storageManager.storeMessageContext(key,msgContext);
+
+
+ //Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ //Otherwise this get send before return of the current request (ack).
+ //TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis()
+ + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(msgContext.getMessageID());
+
+ //this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager
+ .getRetransmitterBeanMgr();
+
+ retramsmitterMgr.insert(terminateBean);
+
+ SequencePropertyBean terminateAdded = new SequencePropertyBean();
+ terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ terminateAdded.setSequenceID(outSequenceID);
+ terminateAdded.setValue("true");
+
+
+ seqPropMgr.insert(terminateAdded);
+
+ //This should be dumped to the storage by the sender
+ TransportOutDescription transportOut = msgContext.getTransportOut();
+ rmMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+ rmMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+ rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+ rmMsgCtx.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+/// addTerminateSeqTransaction.commit();
+
+ AxisEngine engine = new AxisEngine (configurationContext);
+ try {
+ engine.send(msgContext);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.SandeshaException;
+
+/**
+ * To process terminate sequence response messages.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+public class TerminateSeqResponseMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
+
+ public void processInMessage(RMMsgContext terminateResRMMsg)
+ throws SandeshaException {
+
+
+ //TODO add processing logic
+
+ terminateResRMMsg.pause();
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.msgreceivers;
+
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * @author Jaliya Ekanayaka <ja...@opensource.lk>
+ */
+
+//Currently this is a dummy Msg Receiver.
+//All the necessary RM logic happens at MessageProcessors.
+//This only ensures that the defaults Messsage Receiver does not get called for RM control messages.
+
+public class RMMessageReceiver extends AbstractMessageReceiver {
+
+ private static final Log log = LogFactory.getLog(RMMessageReceiver.class.getName());
+
+ public final void receive(MessageContext messgeCtx) throws AxisFault {
+ log.debug("RM MESSSAGE RECEIVER WAS CALLED");
+
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(messgeCtx);
+ log.debug("MsgReceiver got type:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
+ }
+
+}
\ No newline at end of file
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/PolicyEngineData.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/PolicyEngineData.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/PolicyEngineData.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/PolicyEngineData.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,268 @@
+package org.apache.sandesha2.policy;
+
+public class PolicyEngineData {
+
+ private long acknowledgementInterval = -1;
+ private boolean exponentialBackoff = false;
+ private long inactivityTimeout = -1;
+ private String inactivityTimeoutMeassure = null;
+ private boolean invokeInOrder = true;
+ private String messageTypesToDrop = null;
+ private long retransmissionInterval = -1;
+ private String permanentStorageManager = null;
+ private String inMemoryStorageManager = null;
+// private String storageManager = null;
+ private int maximumRetransmissionCount;
+
+ private boolean acknowledgementIntervalSet = false;
+ private boolean exponentialBackoffSet = false;
+ private boolean inactivityTimeoutSet = false;
+ private boolean inactivityTimeoutMeassureSet = false;
+ private boolean invokeInOrderSet = false;
+ private boolean messageTypesToDropSet = false;
+ private boolean retransmissionIntervalSet = false;
+ private boolean permanentStorageManagerSet = false;
+ private boolean inMemoryStorageManagerSet = false;
+// private boolean storageManagerSet = false;
+ private boolean maximumRetransmissionCountSet = false;
+
+ public boolean isExponentialBackoff() {
+ return exponentialBackoff;
+ }
+
+ public void setExponentialBackoff(boolean exponentialBackoff) {
+ this.exponentialBackoff = exponentialBackoff;
+ setExponentialBackoffSet(true);
+ }
+
+ public long getInactivityTimeout() {
+ return inactivityTimeout;
+ }
+
+ public void setInactivityTimeout(long inactivityTimeout) {
+ this.inactivityTimeout = inactivityTimeout;
+ setInactivityTimeoutSet(true);
+ }
+
+ public String getInactivityTimeoutMeassure() {
+ return inactivityTimeoutMeassure;
+ }
+
+ public void setInactivityTimeoutMeassure(String inactivityTimeoutMeassure) {
+ this.inactivityTimeoutMeassure = inactivityTimeoutMeassure;
+ setInactivityTimeoutMeassureSet(true);
+ }
+
+ public boolean isInvokeInOrder() {
+ return invokeInOrder;
+ }
+
+ public void setInvokeInOrder(boolean invokeInOrder) {
+ this.invokeInOrder = invokeInOrder;
+ setInvokeInOrderSet (true);
+ }
+
+ public String getMessageTypesToDrop() {
+ return messageTypesToDrop;
+ }
+
+ public void setMessageTypesToDrop(String messageTypesToDrop) {
+ this.messageTypesToDrop = messageTypesToDrop;
+ setMessageTypesToDropSet(true);
+ }
+
+ public long getRetransmissionInterval() {
+ return retransmissionInterval;
+ }
+
+ public void setRetransmissionInterval(long retransmissionInterval) {
+ this.retransmissionInterval = retransmissionInterval;
+ setRetransmissionIntervalSet(true);
+ }
+
+// public String getPermanentStorageManager() {
+// return permanentStorageMgr;
+// }
+//
+// public void setPermanentStorageManager(String storageManager) {
+// this.permanentStorageMgr = storageManager;
+// }
+
+ public void initializeWithDefaults() {
+
+ }
+
+ public PolicyEngineData copy() {
+ PolicyEngineData ped = new PolicyEngineData();
+
+ if (isAcknowledgementIntervalSet())
+ ped.setAcknowledgementInterval(this.getAcknowledgementInterval());
+
+ if (isExponentialBackoffSet())
+ ped.setExponentialBackoff(this.isExponentialBackoff());
+
+ if (isInactivityTimeoutSet())
+ ped.setInactivityTimeout(this.getInactivityTimeout());
+
+ if (isInactivityTimeoutMeassureSet())
+ ped.setInactivityTimeoutMeassure(this.getInactivityTimeoutMeassure());
+
+ if (isInvokeInOrderSet())
+ ped.setInvokeInOrder(this.isInvokeInOrder());
+
+ if (isMessageTypesToDropSet())
+ ped.setMessageTypesToDrop(this.getMessageTypesToDrop());
+
+ if (isRetransmissionIntervalSet())
+ ped.setRetransmissionInterval(this.getRetransmissionInterval());
+
+ //ped.setPermanentStorageManager(this.getPermanentStorageManager());
+
+// if (isStorageManagerSet())
+// ped.setStorageManager(this.getStorageManager());
+
+ if (isInMemoryStorageManagerSet())
+ ped.setInMemoryStorageManager(this.getInMemoryStorageManager());
+
+ if (isPermanentStorageManagerSet())
+ ped.setPermanentStorageManager(this.getPermanentStorageManager());
+
+ if (isMaximumRetransmissionCountSet())
+ ped.setMaximumRetransmissionCount(this.getMaximumRetransmissionCount());
+
+ return ped;
+ }
+
+ public void setAcknowledgementInterval(long acknowledgementInterval) {
+ this.acknowledgementInterval = acknowledgementInterval;
+ setAcknowledgementIntervalSet(true);
+ }
+
+ public long getAcknowledgementInterval() {
+ return acknowledgementInterval;
+ }
+
+// public void setStorageManager(String storageManager) {
+// this.storageManager = storageManager;
+// setStorageManagerSet(true);
+// }
+//
+// public String getStorageManager() {
+// return storageManager;
+// }
+
+ public int getMaximumRetransmissionCount() {
+ return maximumRetransmissionCount;
+ }
+
+ public void setMaximumRetransmissionCount(int maximumRetransmissionCount) {
+ this.maximumRetransmissionCount = maximumRetransmissionCount;
+ setMaximumRetransmissionCountSet(true);
+ }
+
+ public boolean isAcknowledgementIntervalSet() {
+ return acknowledgementIntervalSet;
+ }
+
+ public boolean isExponentialBackoffSet() {
+ return exponentialBackoffSet;
+ }
+
+ public boolean isInactivityTimeoutMeassureSet() {
+ return inactivityTimeoutMeassureSet;
+ }
+
+ public boolean isInactivityTimeoutSet() {
+ return inactivityTimeoutSet;
+ }
+
+ public boolean isInMemoryStorageManagerSet() {
+ return inMemoryStorageManagerSet;
+ }
+
+ public boolean isInvokeInOrderSet() {
+ return invokeInOrderSet;
+ }
+
+ public boolean isMaximumRetransmissionCountSet() {
+ return maximumRetransmissionCountSet;
+ }
+
+ public boolean isMessageTypesToDropSet() {
+ return messageTypesToDropSet;
+ }
+
+ public boolean isPermanentStorageManagerSet() {
+ return permanentStorageManagerSet;
+ }
+
+ public String getPermanentStorageManager() {
+ return permanentStorageManager;
+ }
+
+ public String getInMemoryStorageManager() {
+ return inMemoryStorageManager;
+ }
+
+ public boolean isRetransmissionIntervalSet() {
+ return retransmissionIntervalSet;
+ }
+
+// public boolean isStorageManagerSet() {
+// return storageManagerSet;
+// }
+
+ private void setAcknowledgementIntervalSet(boolean acknowledgementIntervalSet) {
+ this.acknowledgementIntervalSet = acknowledgementIntervalSet;
+ }
+
+ private void setExponentialBackoffSet(boolean exponentialBackoffSet) {
+ this.exponentialBackoffSet = exponentialBackoffSet;
+ }
+
+ private void setInactivityTimeoutMeassureSet(boolean inactivityTimeoutMeassureSet) {
+ this.inactivityTimeoutMeassureSet = inactivityTimeoutMeassureSet;
+ }
+
+ private void setInactivityTimeoutSet(boolean inactivityTimeoutSet) {
+ this.inactivityTimeoutSet = inactivityTimeoutSet;
+ }
+
+ public void setInMemoryStorageManager(String inMemoryStorageManager) {
+ this.inMemoryStorageManager = inMemoryStorageManager;
+ setInmemoryStorageManagerSet(true);
+ }
+
+ private void setInmemoryStorageManagerSet(boolean inMemoryStorageManagerSet) {
+ this.inMemoryStorageManagerSet = inMemoryStorageManagerSet;
+ }
+
+ private void setInvokeInOrderSet(boolean invokeInOrderSet) {
+ this.invokeInOrderSet = invokeInOrderSet;
+ }
+
+ public void setMaximumRetransmissionCountSet(boolean maximumRetransmissionCountSet) {
+ this.maximumRetransmissionCountSet = maximumRetransmissionCountSet;
+ }
+
+ private void setMessageTypesToDropSet(boolean messageTypesToDropSet) {
+ this.messageTypesToDropSet = messageTypesToDropSet;
+ }
+
+ public void setPermanentStorageManager(String permanentStorageManager) {
+ this.permanentStorageManager = permanentStorageManager;
+ setPermanentStorageManagerSet(true);
+ }
+
+ private void setPermanentStorageManagerSet(boolean permanentStorageManagerSet) {
+ this.permanentStorageManagerSet = permanentStorageManagerSet;
+ }
+
+ private void setRetransmissionIntervalSet(boolean retransmissionIntervalSet) {
+ this.retransmissionIntervalSet = retransmissionIntervalSet;
+ }
+
+// private void setStorageManagerSet(boolean storageManagerSet) {
+// this.storageManagerSet = storageManagerSet;
+// }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicy.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicy.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicy.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicy.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,45 @@
+package org.apache.sandesha2.policy;
+
+
+public class RMPolicy {
+
+ public static final RMPolicyToken retransmissionIterval = new RMPolicyToken(
+ "RetransmissionInterval", RMPolicyToken.SIMPLE_TOKEN,
+ new String[] { "Milliseconds" });
+
+ public static final RMPolicyToken acknowledgementInterval = new RMPolicyToken(
+ "AcknowledgementInterval", RMPolicyToken.SIMPLE_TOKEN,
+ new String[] {});
+
+ public static final RMPolicyToken maximumRetransmissionCount = new RMPolicyToken(
+ "MaximumRetransmissionCount", RMPolicyToken.SIMPLE_TOKEN,
+ new String[] {});
+
+ public static final RMPolicyToken exponentialBackoff = new RMPolicyToken(
+ "ExponentialBackoff", RMPolicyToken.SIMPLE_TOKEN, new String[] {});
+
+ public static final RMPolicyToken inactiveTimeout = new RMPolicyToken(
+ "InactivityTimeout", RMPolicyToken.SIMPLE_TOKEN, new String[] {});
+
+ public static final RMPolicyToken inactiveTimeoutMeasure = new RMPolicyToken(
+ "InactivityTimeoutMeasure", RMPolicyToken.SIMPLE_TOKEN,
+ new String[] {});
+
+ public static final RMPolicyToken invokeInOrder = new RMPolicyToken(
+ "InvokeInOrder", RMPolicyToken.SIMPLE_TOKEN, new String[] {});
+
+ public static final RMPolicyToken messageTypeToDrop = new RMPolicyToken(
+ "MessageTypesToDrop", RMPolicyToken.SIMPLE_TOKEN, new String[] {});
+
+ public static final RMPolicyToken storageManagers = new RMPolicyToken(
+ "StorageManagers", RMPolicyToken.COMPLEX_TOKEN, new String[] {});
+
+ public static final RMPolicyToken inMemoryStorageManager = new RMPolicyToken(
+ "InMemoryStorageManager", RMPolicyToken.SIMPLE_TOKEN, new String[]{});
+
+ public static final RMPolicyToken permanentStorageManager = new RMPolicyToken(
+ "PermanentStorageManager", RMPolicyToken.SIMPLE_TOKEN, new String[]{});
+
+// public static final RMPolicyToken storageManager = new RMPolicyToken(
+// "StorageManager", RMPolicyToken.SIMPLE_TOKEN, new String[]{});
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyBean.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyBean.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyBean.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Used to hold RM Policy information.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ * @author Sanka Samaranayaka <ss...@gmail.com>
+ */
+
+package org.apache.sandesha2.policy;
+
+
+public class RMPolicyBean {
+ private long inactiveTimeoutInterval;
+ private long acknowledgementInterval;
+ private long retransmissionInterval;
+ private boolean exponentialBackoff;
+ private int maximumRetransmissionCount;
+
+ public RMPolicyBean () {
+ loadValuesFromPropertyFile ();
+ }
+
+ private void loadValuesFromPropertyFile () {
+ //TODO load policy values from the file.
+ }
+
+ public long getInactiveTimeoutInterval() {
+ return inactiveTimeoutInterval;
+ }
+
+ public long getAcknowledgementInaterval() {
+ return acknowledgementInterval;
+ }
+
+ public long getRetransmissionInterval() {
+ return retransmissionInterval;
+ }
+
+ public boolean isExponentialBackoff() {
+ return exponentialBackoff;
+ }
+
+ public void setExponentialBackoff(boolean exponentialBackoff) {
+ this.exponentialBackoff = exponentialBackoff;
+ }
+
+ public void setRetransmissionInterval(long retransmissionInterval) {
+ this.retransmissionInterval = retransmissionInterval;
+ }
+
+ public void setInactiveTimeoutInterval(long inactiveTimeoutInterval) {
+ this.inactiveTimeoutInterval = inactiveTimeoutInterval;
+ }
+
+ public void setAcknowledgementInterval(long acknowledgementInterval) {
+ this.acknowledgementInterval = acknowledgementInterval;
+ }
+
+ public int getMaximumRetransmissionCount() {
+ return maximumRetransmissionCount;
+ }
+
+ public void setMaximumRetransmissionCount(int maximumRetransmissionCount) {
+ this.maximumRetransmissionCount = maximumRetransmissionCount;
+ }
+
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyExtension.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyExtension.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyExtension.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyExtension.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,28 @@
+package org.apache.sandesha2.policy;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.modules.PolicyExtension;
+import org.apache.ws.policy.Policy;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+public class RMPolicyExtension implements PolicyExtension {
+
+ public void addMethodsToStub(Document document, Element element, QName opName, Policy policy) {
+
+ Element methods = document.createElement("reliableMessagingMethods");
+
+ Element startSequence = document.createElement("createSequence");
+ methods.appendChild(startSequence);
+
+ Element setLastMessage = document.createElement("setLastMessage");
+ methods.appendChild(setLastMessage);
+
+ Element endSequence = document.createElement("endSequence");
+ methods.appendChild(endSequence);
+
+ element.appendChild(methods);
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,318 @@
+package org.apache.sandesha2.policy;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.processors.AcknowledgementIntervalProcessor;
+import org.apache.sandesha2.policy.processors.ExponentialBackoffProcessor;
+import org.apache.sandesha2.policy.processors.InactivityTimeoutMeasureProcessor;
+import org.apache.sandesha2.policy.processors.InactivityTimeoutProcessor;
+import org.apache.sandesha2.policy.processors.InvokeInOrderProcessor;
+import org.apache.sandesha2.policy.processors.MaximumRetransmissionCountProcessor;
+import org.apache.sandesha2.policy.processors.MessageTypesToDropProcessor;
+import org.apache.sandesha2.policy.processors.RetransmissionItervalProcessor;
+import org.apache.sandesha2.policy.processors.StorageManagersProcessor;
+import org.apache.ws.policy.All;
+import org.apache.ws.policy.Assertion;
+import org.apache.ws.policy.ExactlyOne;
+import org.apache.ws.policy.Policy;
+import org.apache.ws.policy.PrimitiveAssertion;
+import org.apache.ws.policy.util.PolicyFactory;
+import org.apache.ws.policy.util.PolicyReader;
+
+public class RMPolicyProcessor {
+
+ private static final Log logger = LogFactory.getLog(RMPolicyProcessor.class);
+
+ PolicyReader prdr = null;
+
+ RMPolicyToken topLevel = new RMPolicyToken("_TopLevel_",
+ RMPolicyToken.COMPLEX_TOKEN, null);
+
+ RMProcessorContext rmProcessorContext = null;
+
+ public boolean setup() throws NoSuchMethodException {
+ prdr = PolicyFactory.getPolicyReader(PolicyFactory.OM_POLICY_READER);
+
+ RMPolicyToken rpt = null;
+
+ RetransmissionItervalProcessor rip = new RetransmissionItervalProcessor();
+ rpt = RMPolicy.retransmissionIterval.copy();
+ rpt.setProcessTokenMethod(rip);
+ topLevel.setChildToken(rpt);
+
+ AcknowledgementIntervalProcessor aip = new AcknowledgementIntervalProcessor();
+ rpt = RMPolicy.acknowledgementInterval.copy();
+ rpt.setProcessTokenMethod(aip);
+ topLevel.setChildToken(rpt);
+
+ MaximumRetransmissionCountProcessor mrip = new MaximumRetransmissionCountProcessor();
+ rpt = RMPolicy.maximumRetransmissionCount.copy();
+ rpt.setProcessTokenMethod(mrip);
+ topLevel.setChildToken(rpt);
+
+ ExponentialBackoffProcessor ebp = new ExponentialBackoffProcessor();
+ rpt = RMPolicy.exponentialBackoff.copy();
+ rpt.setProcessTokenMethod(ebp);
+ topLevel.setChildToken(rpt);
+
+ InactivityTimeoutMeasureProcessor itmp = new InactivityTimeoutMeasureProcessor();
+ rpt = RMPolicy.inactiveTimeoutMeasure.copy();
+ rpt.setProcessTokenMethod(itmp);
+ topLevel.setChildToken(rpt);
+
+ InactivityTimeoutProcessor itp = new InactivityTimeoutProcessor();
+ rpt = RMPolicy.inactiveTimeout.copy();
+ rpt.setProcessTokenMethod(itp);
+ topLevel.setChildToken(rpt);
+
+ InvokeInOrderProcessor iiop = new InvokeInOrderProcessor();
+ rpt = RMPolicy.invokeInOrder.copy();
+ rpt.setProcessTokenMethod(iiop);
+ topLevel.setChildToken(rpt);
+
+ MessageTypesToDropProcessor mttdp = new MessageTypesToDropProcessor();
+ rpt = RMPolicy.messageTypeToDrop.copy();
+ rpt.setProcessTokenMethod(mttdp);
+ topLevel.setChildToken(rpt);
+
+ StorageManagersProcessor smp = new StorageManagersProcessor();
+ rpt = RMPolicy.storageManagers.copy();
+ rpt.setProcessTokenMethod(smp);
+ topLevel.setChildToken(rpt);
+
+ /*
+ * Now get the initial PolicyEngineData, initialize it and put it onto
+ * the PED stack.
+ */
+ PolicyEngineData ped = new PolicyEngineData();
+ ped.initializeWithDefaults();
+
+ /*
+ * Now get a context and push the top level token onto the token stack.
+ * The top level token is a special token that acts as anchor to start
+ * parsing.
+ */
+ rmProcessorContext = new RMProcessorContext();
+ rmProcessorContext.pushRMToken(topLevel);
+ rmProcessorContext.pushPolicyEngineData(ped);
+
+ return true;
+ }
+
+ /**
+ * This method takes a normalized policy object, processes it and returns
+ * true if all assertion can be fulfilled.
+ *
+ * Each policy must be nromalized accordig to the WS Policy framework
+ * specification. Therefore a policy has one child (wsp:ExactlyOne) that is
+ * a XorCompositeAssertion. This child may contain one or more other terms
+ * (alternatives). To match the policy one of these terms (alternatives)
+ * must match. If none of the contained terms match this policy cannot be
+ * enforced.
+ *
+ * @param policy
+ * The policy to process
+ * @return True if this policy can be enforced by the policy enforcement
+ * implmentation
+ */
+ public boolean processPolicy(Policy policy) {
+ if (!policy.isNormalized()) {
+ policy = (Policy) policy.normalize();
+ }
+
+ ExactlyOne xor = (ExactlyOne) policy.getTerms()
+ .get(0);
+ List listOfPolicyAlternatives = xor.getTerms();
+
+ boolean success = false;
+ int numberOfAlternatives = listOfPolicyAlternatives.size();
+
+ for (int i = 0; !success && i < numberOfAlternatives; i++) {
+ All aPolicyAlternative = (All) listOfPolicyAlternatives
+ .get(i);
+
+ List listOfAssertions = aPolicyAlternative.getTerms();
+
+ Iterator iterator = listOfAssertions.iterator();
+ /*
+ * Loop over all assertions in this alternative. If all assertions
+ * can be fulfilled then we choose this alternative and signal a
+ * success.
+ */
+ boolean all = true;
+ while (all && iterator.hasNext()) {
+ Assertion assertion = (Assertion) iterator.next();
+
+ /*
+ * At this point we expect PrimitiveAssertions only.
+ */
+ if (!(assertion instanceof PrimitiveAssertion)) {
+ logger.debug("Got a unexpected assertion type: "
+ + assertion.getClass().getName());
+ continue;
+ }
+ /*
+ * We need to pick only the primitive assertions which contain a
+ * v1_0 policy assertion. For that we'll check the namespace of
+ * the primitive assertion
+ */
+ PrimitiveAssertion pa = (PrimitiveAssertion) assertion;
+ if (!(pa.getName().getNamespaceURI()
+ .equals("http://ws.apache.org/sandesha2/policy"))) {
+ logger.debug("Got a unexpected assertion: "
+ + pa.getName().getLocalPart());
+ continue;
+ }
+ all = processPrimitiveAssertion((PrimitiveAssertion) assertion);
+ }
+ /*
+ * copy the status of assertion processing. If all is true then this
+ * alternative is "success"ful
+ */
+ success = all;
+ }
+ return success;
+ }
+
+ boolean processPrimitiveAssertion(PrimitiveAssertion pa) {
+ boolean commit = true;
+
+ commit = startPolicyTransaction(pa);
+
+ List terms = pa.getTerms();
+ if (commit && terms.size() > 0) {
+ for (int i = 0; commit && i < terms.size(); i++) {
+ Assertion assertion = (Assertion) pa.getTerms().get(i);
+ if (assertion instanceof Policy) {
+ commit = processPolicy((Policy) assertion);
+ } else if (assertion instanceof PrimitiveAssertion) {
+ commit = processPrimitiveAssertion((PrimitiveAssertion) assertion);
+ }
+ }
+ }
+ if (commit) {
+ commitPolicyTransaction(pa);
+ } else {
+ abortPolicyTransaction(pa);
+ }
+ return commit;
+ }
+
+ public boolean startPolicyTransaction(PrimitiveAssertion pa) {
+ String tokenName = pa.getName().getLocalPart();
+
+ RMPolicyToken rmpt = null;
+
+ /*
+ * Get the current rm policy token from the context and check if the
+ * current token supports/contains this assertion as token. If yes set
+ * this token as current token (push onto stack), set the assertion into
+ * context and call the processing method for this token.
+ */
+ RMPolicyToken currentToken = rmProcessorContext
+ .readCurrentRMToken();
+ if (currentToken == null) {
+ logger.error("Internal error on token stack - No current token");
+ System.exit(1);
+ }
+ rmpt = currentToken.getChildToken(tokenName);
+ rmProcessorContext.pushRMToken(rmpt);
+ rmProcessorContext.setAssertion(pa);
+ rmProcessorContext.setAction(RMProcessorContext.START);
+
+ /*
+ * Get the current state of the PolicyEngineData, make a copy of it and
+ * push the copy onto the PED stack. The token method works on this
+ * copy, adding its data.
+ */
+ PolicyEngineData ped = rmProcessorContext.readCurrentPolicyEngineData();
+ ped = ped.copy();
+ rmProcessorContext.pushPolicyEngineData(ped);
+ if (rmpt == null) {
+ logger
+ .debug("RM token: '" + tokenName
+ + "' unknown in context of '"
+ + currentToken.getTokenName());
+ return false;
+ }
+ boolean ret = false;
+
+ try {
+ ret = rmpt.invokeProcessTokenMethod(rmProcessorContext);
+ } catch (Exception ex) {
+ logger.error("Exception occured when invoking processTokenMethod",
+ ex);
+ } finally {
+ rmProcessorContext.setAction(RMProcessorContext.NONE);
+ }
+ return ret;
+ }
+
+ public void abortPolicyTransaction(PrimitiveAssertion pa) {
+ RMPolicyToken currentToken = rmProcessorContext
+ .readCurrentRMToken();
+ if (currentToken == null) {
+ logger.debug("Abort transaction because of unknown token: '"
+ + pa.getName().getLocalPart() + "'");
+
+ rmProcessorContext.popRMToken();
+ return;
+ }
+
+ rmProcessorContext.setAssertion(pa);
+ rmProcessorContext.setAction(RMProcessorContext.ABORT);
+ try {
+ currentToken.invokeProcessTokenMethod(rmProcessorContext);
+
+ } catch (Exception ex) {
+ logger.error("Exception occured when invoking processTokenMethod:",
+ ex);
+
+ } finally {
+ rmProcessorContext.setAction(RMProcessorContext.NONE);
+ rmProcessorContext.popRMToken();
+ rmProcessorContext.popPolicyEngineData();
+
+ }
+ }
+
+ public void commitPolicyTransaction(PrimitiveAssertion pa) {
+ RMPolicyToken currentToken = rmProcessorContext
+ .readCurrentRMToken();
+ if (currentToken == null) {
+ logger.error("Internal error on token stack - Commiting an unknown token: "
+ + pa.getName().getLocalPart() + "'");
+ System.exit(1);
+ }
+ rmProcessorContext.setAssertion(pa);
+ rmProcessorContext.setAction(RMProcessorContext.COMMIT);
+ try {
+ currentToken.invokeProcessTokenMethod(rmProcessorContext);
+ } catch (IllegalArgumentException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ rmProcessorContext.setAction(RMProcessorContext.NONE);
+ rmProcessorContext.popRMToken();
+ rmProcessorContext.commitPolicyEngineData();
+ }
+ }
+
+ public RMProcessorContext getContext() {
+ return rmProcessorContext;
+ }
+
+ public void setContext(RMProcessorContext rmProcessorContext) {
+ this.rmProcessorContext = rmProcessorContext;
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyToken.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyToken.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyToken.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMPolicyToken.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,140 @@
+package org.apache.sandesha2.policy;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+
+
+public class RMPolicyToken {
+ /**
+ * The following values describe the type of the rm pocliy token. A complex
+ * token starts a transaction because it contains nested tokens. A simple
+ * token does not contain nested tokens but stands allone an defines a
+ * simple assertion or property.
+ *
+ * If Content is set then this token contains additional text content, e.g.
+ * XPath expressions.
+ */
+ public static final int COMPLEX_TOKEN = 1;
+
+ public static final int SIMPLE_TOKEN = 2;
+
+ public static final int WITH_CONTENT = 100;
+
+ private String tokenName;
+
+ private int tokenType = 0;
+
+ // private boolean supported = false;
+
+ private String[] attributes = null;
+
+ private Object handler = null;
+
+ private Method processTokenMethod = null;
+
+ private ArrayList childTokens = null;
+
+ private static final Log logger = LogFactory.getLog(RMPolicyToken.class);
+
+ public RMPolicyToken(String token, int type, String[] attribs,
+ Object h) throws SandeshaException, NoSuchMethodException {
+ this(token, type, attribs);
+ setProcessTokenMethod(h);
+ }
+
+ public RMPolicyToken(String token, int type, String[] attribs) {
+ tokenName = token;
+ tokenType = type;
+ attributes = attribs;
+
+ if (tokenType == COMPLEX_TOKEN) {
+ childTokens = new ArrayList();
+ }
+ }
+
+ /**
+ * @return Returns the attributes.
+ */
+ public String[] getAttributes() {
+ return attributes;
+ }
+
+ public void setProcessTokenMethod(Object h) throws NoSuchMethodException {
+
+ if (h == null) {
+ return;
+ }
+ handler = h;
+ Class handlerCls = h.getClass();
+ Class[] parameters = new Class[] { RMProcessorContext.class };
+
+ processTokenMethod = handlerCls.getDeclaredMethod("do" + tokenName,
+ parameters);
+ }
+
+ public boolean invokeProcessTokenMethod(RMProcessorContext spc)
+ throws IllegalArgumentException, IllegalAccessException,
+ InvocationTargetException {
+
+ if (processTokenMethod == null) {
+ return false;
+ }
+ Object[] parameter = new Object[] { spc };
+ Object ret = processTokenMethod.invoke(handler, parameter);
+ Boolean bool;
+ if (ret instanceof Boolean) {
+ bool = (Boolean) ret;
+ return bool.booleanValue();
+ }
+ return false;
+ }
+
+ public String getTokenName() {
+ return tokenName;
+ }
+
+ public void setChildToken(RMPolicyToken spt) {
+ childTokens.add(spt);
+ }
+
+ public RMPolicyToken getChildToken(String sptName) {
+ Iterator it = childTokens.iterator();
+ while (it.hasNext()) {
+ RMPolicyToken tmpSpt = (RMPolicyToken) it.next();
+ if (sptName.equals(tmpSpt.getTokenName())) {
+ return tmpSpt;
+ }
+ }
+ return null;
+ }
+
+
+ public void removeChildToken(String sptName) {
+ Iterator it = childTokens.iterator();
+ while (it.hasNext()) {
+ RMPolicyToken tmpSpt = (RMPolicyToken) it.next();
+ if (sptName.equals(tmpSpt.getTokenName())) {
+ childTokens.remove(tmpSpt);
+ return;
+ }
+ }
+ }
+
+ public RMPolicyToken copy() {
+ RMPolicyToken spt = new RMPolicyToken(tokenName, tokenType,
+ attributes);
+ if (childTokens != null) {
+ Iterator it = childTokens.iterator();
+ while (it.hasNext()) {
+ RMPolicyToken tmpSpt = (RMPolicyToken) it.next();
+ spt.setChildToken(tmpSpt);
+ }
+ }
+ return spt;
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMProcessorContext.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMProcessorContext.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMProcessorContext.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/RMProcessorContext.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,107 @@
+package org.apache.sandesha2.policy;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ws.policy.PrimitiveAssertion;
+
+
+public class RMProcessorContext {
+ public static final int NONE = 0;
+ public static final int START = 1;
+ public static final int COMMIT = 2;
+ public static final int ABORT = 3;
+
+ public static final String[] ACTION_NAMES = new String[]{"NONE", "START", "COMMIT", "ABORT"};
+
+ private ArrayList tokenStack = new ArrayList();
+
+ private int tokenStackPointer = 0;
+
+ private ArrayList pedStack = new ArrayList();
+
+ private int pedStackPointer = 0;
+
+ private PrimitiveAssertion assertion = null;
+
+ private int action = NONE;
+
+ private static final Log logger = LogFactory.getLog(RMProcessorContext.class);
+
+ public RMProcessorContext() {
+ }
+
+ public int getAction() {
+ return action;
+ }
+
+ public void setAction(int act) {
+ this.action = act;
+ }
+
+ public PrimitiveAssertion getAssertion() {
+ return assertion;
+ }
+
+ public void setAssertion(PrimitiveAssertion asrt) {
+ this.assertion = asrt;
+ }
+
+ public void pushRMToken(RMPolicyToken spt) {
+ tokenStack.add(tokenStackPointer, spt);
+ tokenStackPointer++;
+ }
+
+ public RMPolicyToken popRMToken() {
+ if (tokenStackPointer > 0) {
+ tokenStackPointer--;
+ return (RMPolicyToken) tokenStack.get(tokenStackPointer);
+ } else {
+ return null;
+ }
+ }
+
+ public RMPolicyToken readCurrentRMToken() {
+ if (tokenStackPointer > 0) {
+ return (RMPolicyToken) tokenStack.get(tokenStackPointer - 1);
+ } else {
+ return null;
+ }
+ }
+
+ public void pushPolicyEngineData(PolicyEngineData ped) {
+ pedStack.add(pedStackPointer, ped);
+ pedStackPointer++;
+ }
+
+ public PolicyEngineData popPolicyEngineData() {
+ if (pedStackPointer > 0) {
+ pedStackPointer--;
+ return (PolicyEngineData) pedStack.get(pedStackPointer);
+ } else {
+ return null;
+ }
+ }
+
+ public PolicyEngineData readCurrentPolicyEngineData() {
+ if (pedStackPointer > 0) {
+ return (PolicyEngineData) pedStack.get(pedStackPointer - 1);
+ } else {
+ return null;
+ }
+ }
+
+ public PolicyEngineData commitPolicyEngineData() {
+ if (pedStackPointer > 1) {
+ pedStackPointer--;
+ PolicyEngineData ped = (PolicyEngineData) pedStack.get(pedStackPointer);
+ pedStackPointer--;
+ pedStack.add(pedStackPointer, ped);
+ pedStackPointer++;
+ return ped;
+ } else {
+ return null;
+ }
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/AcknowledgementIntervalProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/AcknowledgementIntervalProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/AcknowledgementIntervalProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/AcknowledgementIntervalProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,53 @@
+package org.apache.sandesha2.policy.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.PolicyEngineData;
+import org.apache.sandesha2.policy.RMPolicyToken;
+import org.apache.sandesha2.policy.RMProcessorContext;
+
+public class AcknowledgementIntervalProcessor {
+
+ private boolean initializedAcknowledgementInterval = false;
+
+ private Log logger = LogFactory.getLog(this.getClass().getName());
+
+ public void initializeAcknowledgementIterval(RMPolicyToken rmpt)
+ throws NoSuchMethodException {
+
+ }
+
+ public Object doAcknowledgementInterval(RMProcessorContext rmpc) {
+ RMPolicyToken rmpt = rmpc.readCurrentRMToken();
+
+ switch (rmpc.getAction()) {
+
+ case RMProcessorContext.START:
+ if (!initializedAcknowledgementInterval) {
+ try {
+ initializeAcknowledgementIterval(rmpt);
+ initializedAcknowledgementInterval = true;
+ } catch (NoSuchMethodException e) {
+ logger.error("AcknowledgementIntervalProcessor:doAcknowledgementInterval", e);
+ return new Boolean(false);
+ }
+ }
+ logger.debug(rmpt.getTokenName());
+
+ case RMProcessorContext.COMMIT:
+
+ // //////////
+
+ PolicyEngineData engineData = rmpc.readCurrentPolicyEngineData();
+ String txt = rmpc.getAssertion().getStrValue();
+ engineData.setAcknowledgementInterval(Long.parseLong(txt.trim()));
+
+ // /////////////////////////////////
+
+ break;
+ case RMProcessorContext.ABORT:
+ break;
+ }
+ return new Boolean(true);
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/ExponentialBackoffProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/ExponentialBackoffProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/ExponentialBackoffProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/ExponentialBackoffProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,53 @@
+package org.apache.sandesha2.policy.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.PolicyEngineData;
+import org.apache.sandesha2.policy.RMPolicyToken;
+import org.apache.sandesha2.policy.RMProcessorContext;
+
+public class ExponentialBackoffProcessor {
+
+ private boolean initializedExponentialBackoff = false;
+
+ private Log logger = LogFactory.getLog(this.getClass().getName());
+
+ public void initializeExponentialBackoff(RMPolicyToken rmpt)
+ throws NoSuchMethodException {
+
+ }
+
+ public Object doExponentialBackoff(RMProcessorContext rmpc) {
+
+ RMPolicyToken rmpt = rmpc.readCurrentRMToken();
+ switch (rmpc.getAction()) {
+
+ case RMProcessorContext.START:
+ if (!initializedExponentialBackoff) {
+ try {
+ initializeExponentialBackoff(rmpt);
+ initializedExponentialBackoff = true;
+ } catch (NoSuchMethodException e) {
+ logger.error("Exception occured when invoking processTokenMethod", e);
+ return new Boolean(false);
+ }
+ }
+ logger.debug(rmpt.getTokenName());
+
+ case RMProcessorContext.COMMIT:
+
+ // ///////
+
+ PolicyEngineData ped = rmpc.readCurrentPolicyEngineData();
+ String text = rmpc.getAssertion().getStrValue();
+ ped.setExponentialBackoff(new Boolean(text.trim()).booleanValue());
+
+ // ///////
+
+ break;
+ case RMProcessorContext.ABORT:
+ break;
+ }
+ return new Boolean(true);
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutMeasureProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutMeasureProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutMeasureProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutMeasureProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,51 @@
+package org.apache.sandesha2.policy.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.PolicyEngineData;
+import org.apache.sandesha2.policy.RMPolicyToken;
+import org.apache.sandesha2.policy.RMProcessorContext;
+
+public class InactivityTimeoutMeasureProcessor {
+ private boolean initializedInactivityTimeoutMeasure = false;
+ private Log logger = LogFactory.getLog(this.getClass().getName());
+
+ public void initializeInactivityTimeoutMeasure(RMPolicyToken rmpt)
+ throws NoSuchMethodException {
+
+ }
+
+ public Object doInactivityTimeoutMeasure(RMProcessorContext rmpc) {
+ RMPolicyToken rmpt = rmpc.readCurrentRMToken();
+ switch (rmpc.getAction()) {
+
+ case RMProcessorContext.START:
+ if (!initializedInactivityTimeoutMeasure) {
+ try {
+ initializeInactivityTimeoutMeasure(rmpt);
+ initializedInactivityTimeoutMeasure = true;
+ } catch (NoSuchMethodException e) {
+ logger.error("Exception occured when initializeInactivityTimeoutMeasure", e);
+ return new Boolean(false);
+ }
+ }
+ logger.debug(rmpt.getTokenName());
+
+ case RMProcessorContext.COMMIT:
+
+ //////////////
+
+ PolicyEngineData ped = rmpc.readCurrentPolicyEngineData();
+ String value = rmpc.getAssertion().getStrValue();
+ ped.setInactivityTimeoutMeassure(value);
+
+ //////////////
+
+ break;
+ case RMProcessorContext.ABORT:
+ break;
+ }
+ return new Boolean(true);
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InactivityTimeoutProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,51 @@
+package org.apache.sandesha2.policy.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.PolicyEngineData;
+import org.apache.sandesha2.policy.RMPolicyToken;
+import org.apache.sandesha2.policy.RMProcessorContext;
+
+
+public class InactivityTimeoutProcessor {
+ private boolean initializedInactivityTimeout = false;
+ private Log logger = LogFactory.getLog(this.getClass().getName());
+
+ public void initializeInactivityTimeout(RMPolicyToken spt)
+ throws NoSuchMethodException {
+
+ }
+
+ public Object doInactivityTimeout(RMProcessorContext rmpc) {
+
+ RMPolicyToken rmpt = rmpc.readCurrentRMToken();
+ switch (rmpc.getAction()) {
+
+ case RMProcessorContext.START:
+ if (!initializedInactivityTimeout) {
+ try {
+ initializeInactivityTimeout(rmpt);
+ initializedInactivityTimeout = true;
+ } catch (NoSuchMethodException e) {
+ logger.error("Exception occured in initializeInactivityTimeout", e);
+ return new Boolean(false);
+ }
+ }
+ logger.debug(rmpt.getTokenName());
+
+ case RMProcessorContext.COMMIT:
+
+ ///////////////
+ PolicyEngineData ped = rmpc.readCurrentPolicyEngineData();
+ String text = rmpc.getAssertion().getStrValue();
+ ped.setInactivityTimeout(Long.parseLong(text));
+ ///////////////
+
+ break;
+ case RMProcessorContext.ABORT:
+ break;
+ }
+ return new Boolean(true);
+ }
+
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InvokeInOrderProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InvokeInOrderProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InvokeInOrderProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/InvokeInOrderProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,45 @@
+package org.apache.sandesha2.policy.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.PolicyEngineData;
+import org.apache.sandesha2.policy.RMPolicyToken;
+import org.apache.sandesha2.policy.RMProcessorContext;
+
+
+public class InvokeInOrderProcessor {
+ private boolean initializedInvokeInOrder = false;
+ private Log logger = LogFactory.getLog(this.getClass().getName());
+
+ public void initializeInvokeInOrder(RMPolicyToken spt)
+ throws NoSuchMethodException {
+ }
+
+ public Object doInvokeInOrder(RMProcessorContext rmpc) {
+ RMPolicyToken rmpt = rmpc.readCurrentRMToken();
+ switch (rmpc.getAction()) {
+
+ case RMProcessorContext.START:
+ if (!initializedInvokeInOrder) {
+ try {
+ initializeInvokeInOrder(rmpt);
+ initializedInvokeInOrder = true;
+ } catch (NoSuchMethodException e) {
+ logger.error("Exception occured in initializeInvokeInOrder", e);
+ return new Boolean(false);
+ }
+ }
+ logger.debug(rmpt.getTokenName());
+
+ case RMProcessorContext.COMMIT:
+
+ PolicyEngineData ped = rmpc.readCurrentPolicyEngineData();
+ String text = rmpc.getAssertion().getStrValue();
+ ped.setInvokeInOrder(new Boolean(text).booleanValue());
+ break;
+ case RMProcessorContext.ABORT:
+ break;
+ }
+ return new Boolean(true);
+ }
+}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/MaximumRetransmissionCountProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/MaximumRetransmissionCountProcessor.java?rev=414476&view=auto
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/MaximumRetransmissionCountProcessor.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/processors/MaximumRetransmissionCountProcessor.java Wed Jun 14 22:51:15 2006
@@ -0,0 +1,52 @@
+package org.apache.sandesha2.policy.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.policy.PolicyEngineData;
+import org.apache.sandesha2.policy.RMPolicyToken;
+import org.apache.sandesha2.policy.RMProcessorContext;
+
+public class MaximumRetransmissionCountProcessor {
+
+ private boolean initializedMaximumRetransmissionCount = false;
+
+ private Log logger = LogFactory.getLog(this.getClass().getName());
+
+ public void initializeMaximumRetransmissionCount(RMPolicyToken rmpt)
+ throws NoSuchMethodException {
+ }
+
+ public Object doMaximumRetransmissionCount(RMProcessorContext rmpc) {
+ RMPolicyToken rmpt = rmpc.readCurrentRMToken();
+
+ switch (rmpc.getAction()) {
+
+ case RMProcessorContext.START:
+ if (!initializedMaximumRetransmissionCount) {
+ try {
+ initializeMaximumRetransmissionCount(rmpt);
+ initializedMaximumRetransmissionCount = true;
+ } catch (NoSuchMethodException e) {
+ logger.error("MaximumRetransmissionCountProcessor:doAcknowledgementInterval", e);
+ return new Boolean(false);
+ }
+ }
+ logger.debug(rmpt.getTokenName());
+
+ case RMProcessorContext.COMMIT:
+
+ // //////////
+
+ PolicyEngineData engineData = rmpc.readCurrentPolicyEngineData();
+ String txt = rmpc.getAssertion().getStrValue();
+ engineData.setMaximumRetransmissionCount(Integer.parseInt(txt.trim()));
+
+ // /////////////////////////////////
+
+ break;
+ case RMProcessorContext.ABORT:
+ break;
+ }
+ return new Boolean(true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org