You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/09/27 04:58:24 UTC
svn commit: r291809 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/
storage/beanmanagers/ storage/beans/ util/
Author: chamikara
Date: Mon Sep 26 19:57:56 2005
New Revision: 291809
URL: http://svn.apache.org/viewcvs?rev=291809&view=rev
Log:
Following message exchange is working for the echo case.
--> create seq
<-- async create seq response
--> application msg 1
<-- sync ack
--> terminate seq.
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/ServerOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Sep 26 19:57:56 2005
@@ -87,6 +87,9 @@
String ADDRESS = "Address";
}
+ public interface WSP {
+ long RETRANSMISSION_INTERVAL = 3000;
+ }
String RM_HEADERS = "rmHeaders";
String SEQUENCE = "sequence";
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java Mon Sep 26 19:57:56 2005
@@ -26,6 +26,7 @@
import org.apache.axis2.addressing.om.AddressingHeaders;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.axis2.om.OMAbstractFactory;
import org.apache.axis2.om.impl.MIMEOutputUtils;
import org.apache.axis2.soap.SOAPEnvelope;
@@ -59,61 +60,69 @@
public class RMMsgCreator {
- public static RMMsgContext createCreateSeqMsg(RMMsgContext applicationMsg)
+ public static RMMsgContext createCreateSeqMsg(RMMsgContext applicationRMMsg)
throws SandeshaException {
- ConfigurationContext context = applicationMsg.getMessageContext()
+ ConfigurationContext context = applicationRMMsg.getMessageContext()
.getSystemContext();
if (context == null)
throw new SandeshaException("Configuration Context is null");
MessageContext msgContext;
try {
- msgContext = new MessageContext(context,applicationMsg.getMessageContext().getTransportIn(),
- applicationMsg.getMessageContext().getTransportOut());
+ msgContext = new MessageContext(context, applicationRMMsg
+ .getMessageContext().getTransportIn(), applicationRMMsg
+ .getMessageContext().getTransportOut());
} catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
+ throw new SandeshaException(e.getMessage());
}
- msgContext.setTo(applicationMsg.getTo());
- msgContext.setReplyTo(applicationMsg.getReplyTo());
-
- RMMsgContext createSeqMsg = new RMMsgContext(msgContext);
+ msgContext.setTo(applicationRMMsg.getTo());
+ msgContext.setReplyTo(applicationRMMsg.getReplyTo());
+
+ RMMsgContext createSeqRMMsg = new RMMsgContext(msgContext);
CreateSequence createSequencePart = new CreateSequence();
-
+
//TODO correct below. Set a correct acksTo value
- EndpointReference acksToEPR = applicationMsg.getReplyTo();
-
- createSequencePart.setAcksTo (new AcksTo (new Address (acksToEPR)));
- createSeqMsg.setMessagePart(Constants.MESSAGE_PART_CREATE_SEQ,
+ EndpointReference acksToEPR = applicationRMMsg.getReplyTo();
+
+ createSequencePart.setAcksTo(new AcksTo(new Address(acksToEPR)));
+ createSeqRMMsg.setMessagePart(Constants.MESSAGE_PART_CREATE_SEQ,
createSequencePart);
-
+
try {
- createSeqMsg.addSOAPEnvelope();
+ createSeqRMMsg.addSOAPEnvelope();
} catch (AxisFault e1) {
- throw new SandeshaException (e1.getMessage());
+ throw new SandeshaException(e1.getMessage());
}
- createSeqMsg.setAction(Constants.WSRM.ACTION_CREATE_SEQ);
- EndpointReference to = applicationMsg.getTo();
+ createSeqRMMsg.setAction(Constants.WSRM.ACTION_CREATE_SEQ);
+ EndpointReference to = applicationRMMsg.getTo();
if (to == null || to.getAddress() == null || to.getAddress() == null
|| to.getAddress() == "")
throw new SandeshaException(
"To value of the Application Message is not set correctly");
- createSeqMsg.setTo(to);
+ createSeqRMMsg.setTo(to);
- EndpointReference replyTo = applicationMsg.getReplyTo();
+ EndpointReference replyTo = applicationRMMsg.getReplyTo();
if (replyTo == null || replyTo.getAddress() == null
|| replyTo.getAddress() == null || to.getAddress() == "")
throw new SandeshaException(
"ReplyTo value of the Application Message is not set correctly");
- createSeqMsg.setTo(replyTo);
- createSeqMsg.setReplyTo(replyTo);
+ createSeqRMMsg.setReplyTo(replyTo);
+ createSeqRMMsg.setMessageId(SandeshaUtil.getUUID());
+
+ MessageContext createSeqMsg = createSeqRMMsg.getMessageContext();
+ MessageContext applicationMsg = applicationRMMsg.getMessageContext();
+ createSeqMsg.setServiceGroupContext(applicationMsg
+ .getServiceGroupContext());
+ createSeqMsg.setServiceGroupContextId(applicationMsg
+ .getServiceGroupContextId());
+ createSeqMsg.setServiceContext(applicationMsg.getServiceContext());
+ createSeqMsg.setServiceContextID(applicationMsg.getServiceContextID());
- createSeqMsg.setMessageId(SandeshaUtil.getUUID());
-
- return createSeqMsg;
+ return createSeqRMMsg;
}
public static RMMsgContext createCreateSeqResponseMsg(
@@ -163,7 +172,7 @@
//Adds a ack message to the following message.
public static void addAckMessage(RMMsgContext applicationMsg)
- throws SandeshaException {
+ throws SandeshaException {
SOAPEnvelope envelope = applicationMsg.getSOAPEnvelope();
if (envelope == null) {
SOAPEnvelope newEnvelope = SOAPAbstractFactory.getSOAPFactory(
@@ -173,28 +182,27 @@
envelope = applicationMsg.getSOAPEnvelope();
MessageContext requestMessage = null;
-
+
try {
requestMessage = applicationMsg.getMessageContext()
.getOperationContext().getMessageContext(
WSDLConstants.MESSAGE_LABEL_IN);
} catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
+ throw new SandeshaException(e.getMessage());
}
- if (requestMessage==null)
- throw new SandeshaException ("Request message is null");
-
- RMMsgContext reqRMMsgCtx = null;
+ if (requestMessage == null)
+ throw new SandeshaException("Request message is null");
+ RMMsgContext reqRMMsgCtx = null;
reqRMMsgCtx = MsgInitializer.initializeMessage(requestMessage);
-
Sequence reqSequence = (Sequence) reqRMMsgCtx
.getMessagePart(Constants.MESSAGE_PART_SEQUENCE);
if (reqSequence == null)
- throw new SandeshaException("Sequence part of application message is null");
+ throw new SandeshaException(
+ "Sequence part of application message is null");
String sequenceId = reqSequence.getIdentifier().getIdentifier();
@@ -234,18 +242,39 @@
applicationMsg.setMessageId(SandeshaUtil.getUUID());
}
-
- public static RMMsgContext createAckMessage (RMMsgContext applicationMsg) throws SandeshaException {
+
+ public static RMMsgContext createAckMessage(RMMsgContext applicationMsg)
+ throws SandeshaException {
try {
- MessageContext applicationMsgCtx = applicationMsg.getMessageContext();
- MessageContext ackMsgCtx = new MessageContext (applicationMsgCtx.getSystemContext(),
- applicationMsgCtx.getTransportIn(),applicationMsgCtx.getTransportOut());
- RMMsgContext ackRMMsgCtx = new RMMsgContext (ackMsgCtx);
- ackRMMsgCtx.getMessageContext().setOperationContext(applicationMsgCtx.getOperationContext());
+ MessageContext applicationMsgCtx = applicationMsg
+ .getMessageContext();
+ MessageContext ackMsgCtx = SandeshaUtil
+ .copyMessageContext(applicationMsgCtx);
+ ackMsgCtx.setServiceGroupContext(applicationMsgCtx
+ .getServiceGroupContext());
+ ackMsgCtx.setServiceGroupContextId(applicationMsgCtx
+ .getServiceGroupContextId());
+ ackMsgCtx.setServiceContext(applicationMsgCtx.getServiceContext());
+ ackMsgCtx.setServiceContextID(applicationMsgCtx
+ .getServiceContextID());
+ RMMsgContext ackRMMsgCtx = new RMMsgContext(ackMsgCtx);
+
+ OperationContext ackOpCtx = new OperationContext(ackMsgCtx
+ .getOperationDescription());
+ MessageContext requestAppMsg = applicationMsgCtx
+ .getOperationContext().getMessageContext(
+ WSDLConstants.MESSAGE_LABEL_IN);
+
+ //added request applicatin message as the request message for the newly created ack.
+ //this helps to tread addAck and createAck cases equally.
+ ackOpCtx.addMessageContext(requestAppMsg);
+ ackMsgCtx.setOperationContext(ackOpCtx);
+ ackOpCtx.addMessageContext(ackMsgCtx);
+
addAckMessage(ackRMMsgCtx);
return ackRMMsgCtx;
} catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
- }
+ throw new SandeshaException(e.getMessage());
+ }
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java Mon Sep 26 19:57:56 2005
@@ -1,17 +1,22 @@
/*
- * Created on Sep 5, 2005
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * TODO To change the template for this generated file go to
- * Window - Preferences - Java - Code Style - Code Templates
*/
+
package org.apache.sandesha2;
-/**
- * @author chamikara
- *
- * TODO To change the template for this generated type comment go to
- * Window - Preferences - Java - Code Style - Code Templates
- */
public class SandeshaException extends Exception {
public SandeshaException (String message) {
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java?rev=291809&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java Mon Sep 26 19:57:56 2005
@@ -0,0 +1,86 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.sandesha2;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+public class Sender extends Thread {
+
+ private boolean senderStarted = false;
+ private ConfigurationContext context = null;
+
+ public synchronized void stopSender () {
+ senderStarted = false;
+ }
+
+ public synchronized boolean isSenderStarted () {
+ return senderStarted;
+ }
+
+ public void run () {
+
+ while (senderStarted) {
+ try {
+ if (context==null)
+ throw new SandeshaException ("Can't continue the Sender. Context is null");
+ } catch (SandeshaException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ RetransmitterBeanMgr mgr = AbstractBeanMgrFactory.getInstance(context).getRetransmitterBeanMgr();
+ Collection coll = mgr.findMsgsToSend();
+ Iterator iter = coll.iterator();
+ while (iter.hasNext()) {
+ RetransmitterBean bean = (RetransmitterBean) iter.next();
+ String key = (String) bean.getKey();
+ MessageContext msgCtx = SandeshaUtil.getStoredMessageContext(key);
+ try {
+ new AxisEngine(context).send(msgCtx);
+ } catch (AxisFault e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ }
+
+ public void start (ConfigurationContext context) {
+ System.out.println ("Starting the sender......");
+ senderStarted = true;
+ this.context = context;
+ super.start();
+ }
+
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/ServerOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/ServerOutHandler.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/ServerOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/ServerOutHandler.java Mon Sep 26 19:57:56 2005
@@ -23,9 +23,11 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clientapi.InOutMEPClient;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.axis2.soap.SOAPBody;
import org.apache.axis2.soap.SOAPEnvelope;
@@ -58,8 +60,14 @@
public class ServerOutHandler extends AbstractHandler {
public void invoke(MessageContext msgCtx) throws AxisFault {
+ //log
System.out.println("In server OutHandler");
+ //Strating the sender.
+ ConfigurationContext ctx = msgCtx.getSystemContext();
+ SandeshaUtil.startSenderIfStopped(ctx);
+
+ //getting rm message
RMMsgContext rmMsgCtx;
try {
rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
@@ -67,37 +75,21 @@
throw new AxisFault("Cant initialize the message");
}
- //getting the request message.
+ //getting the request message and rmMessage.
MessageContext reqMsgCtx = msgCtx.getOperationContext()
.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN);
RMMsgContext requestRMMsgCtx;
try {
requestRMMsgCtx = MsgInitializer.initializeMessage(reqMsgCtx);
- } catch (SandeshaException ex) {
- throw new AxisFault("Cant initialize the message");
- }
-
- // try {
- // if (requestRMMsgCtx.getMessageType() !=
- // Constants.MESSAGE_TYPE_CREATE_SEQ)
- // RMMsgCreator.addAckMessage(rmMsgCtx);
- // } catch (SandeshaException e) {
- // throw new AxisFault (e.getMessage());
- // }
-
- try {
if (rmMsgCtx.getMessageType() == Constants.MESSAGE_TYPE_UNKNOWN) {
- //This is a possible response message.
- System.out.println("GOT Possible Response Message");
- //RMMsgCreator.addAckMessage(rmMsgCtx);
+ System.out.println("GOT Possible Response Message");
AbstractContext context = rmMsgCtx.getContext();
if (context == null)
throw new SandeshaException("Context is null");
+
Sequence sequence = (Sequence) requestRMMsgCtx
.getMessagePart(Constants.MESSAGE_PART_SEQUENCE);
- //check weather the seq is is available.
-
if (sequence == null)
throw new SandeshaException("Sequence part is null");
@@ -109,7 +101,6 @@
.getInstance(context).getSequencePropretyBeanMgr();
SequencePropertyBean acksToBean = seqPropMgr.retrieve(
incomingSeqId, Constants.SEQ_PROPERTY_ACKS_TO_EPR);
-
if (acksToBean == null
|| acksToBean.getValue() == null
|| !(acksToBean.getValue() instanceof EndpointReference))
@@ -128,7 +119,6 @@
.getDefaultEnvelope();
rmMsgCtx.setSOAPEnvelop(envelope);
}
-
SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
if (soapBody == null)
throw new SandeshaException(
@@ -139,63 +129,44 @@
validResponse = true;
if (!validResponse) { //TODO either change MsgReceiver or move
- // if code to in handler.
if (Constants.WSA.NS_URI_ANONYMOUS.equals(acksToEPR
.getAddress())) {
RMMsgCreator.addAckMessage(rmMsgCtx);
}
} else {
//valid response
-
+
RMMsgContext ackRMMsgContext = RMMsgCreator
.createAckMessage(rmMsgCtx);
- //SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
- ackRMMsgContext.getSOAPEnvelope();
-
+ MessageContext ackMsgContext = ackRMMsgContext.getMessageContext();
+ ackMsgContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
+ ackMsgContext.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
+ ackMsgContext.setServiceContext(msgCtx.getServiceContext());
+ ackMsgContext.setServiceContextID(msgCtx.getServiceContextID());
+ OperationContext ackOpContext = new OperationContext (ackMsgContext.getOperationDescription());
+ ackOpContext.addMessageContext(ackMsgContext);
+ ackMsgContext.setOperationContext(ackOpContext);
RMMsgContext newRMMsgCtx = SandeshaUtil
.copyRMMessageContext(rmMsgCtx);
+ MessageContext newMsgCtx = newRMMsgCtx.getMessageContext();
rmMsgCtx.setSOAPEnvelop(ackRMMsgContext.getSOAPEnvelope());
+ //setting contexts
+ newMsgCtx.setServiceGroupContext(msgCtx.getServiceGroupContext());
+ newMsgCtx.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
+ newMsgCtx.setServiceContext(msgCtx.getServiceContext());
+ newMsgCtx.setServiceContextID(msgCtx.getServiceContextID());
+ OperationContext newOpContext = new OperationContext (newMsgCtx.getOperationDescription());
+ newOpContext.addMessageContext(newMsgCtx);
+ newMsgCtx.setOperationContext(newOpContext);
+
+ //processing the response
processResponseMessage(newRMMsgCtx, requestRMMsgCtx);
-
- SOAPEnvelope env1 = msgCtx.getEnvelope();
-
- try {
- XMLStreamWriter writer = XMLOutputFactory.newInstance()
- .createXMLStreamWriter(System.out);
- System.out.println ("Writing envelop");
- env1.serialize(writer);
- } catch (Exception ex) {
- ex.printStackTrace();
- }
-
}
-
- // if (acksToEPR.getAddress().equals(
- // Constants.WSA.NS_URI_ANONYMOUS)) {
- // RMMsgCreator.addAckMessage(rmMsgCtx);
- // } else {
- // RMMsgContext ackRMMessage = RMMsgCreator
- // .createAckMessage(rmMsgCtx);
- // //TODO add async ack to the retransmitter.
- // }
- // //processResponseMessage(rmMsgCtx, requestRMMsgCtx);
-
}
} catch (SandeshaException e) {
throw new AxisFault(e.getMessage());
}
-
- // SOAPEnvelope env = msgCtx.getEnvelope();
- //
- // try {
- // XMLStreamWriter writer = XMLOutputFactory.newInstance()
- // .createXMLStreamWriter(System.out);
- // //env.serialize(writer);
- // } catch (Exception ex) {
- // ex.printStackTrace();
- // }
-
}
private void processResponseMessage(RMMsgContext msg, RMMsgContext reqMsg)
@@ -205,8 +176,6 @@
Sequence sequence = (Sequence) reqMsg
.getMessagePart(Constants.MESSAGE_PART_SEQUENCE);
- //check weather the seq is is available.
-
if (sequence == null)
throw new SandeshaException("Sequence part is null");
@@ -218,7 +187,6 @@
if (context == null)
throw new SandeshaException("Context is null");
- System.out.println("INCOMING SEQUENCE ID:" + incomingSeqId);
SequencePropertyBeanMgr mgr = AbstractBeanMgrFactory.getInstance(
context).getSequencePropretyBeanMgr();
@@ -248,16 +216,7 @@
msg.setTo(incomingReplyTo);
msg.setReplyTo(incomingTo);
- // //TODO get following URL correctly.
- // String serverURL =
- // "http://localhost:8070/axis2/services/InteropService";
- // EndpointReference replyTo = new EndpointReference (serverURL);
- // msg.setReplyTo(replyTo);
-
if (outSequenceBean == null || outSequenceBean.getValue() == null) {
- //sequence id is not present
-
- //add msg to retransmitter with send=false;
RetransmitterBeanMgr retransmitterMgr = AbstractBeanMgrFactory
.getInstance(context).getRetransmitterBeanMgr();
@@ -271,7 +230,6 @@
appMsgEntry.setTempSequenceId(incomingSeqId);
appMsgEntry.setSend(false);
appMsgEntry.setMessageId(msg.getMessageId());
-
retransmitterMgr.insert(appMsgEntry);
addCreateSequenceMessage(msg);
@@ -280,16 +238,19 @@
//Sequence id is present
//set sequence part
//add message to retransmitter table with send=true;
-
}
}
- public void addCreateSequenceMessage(RMMsgContext applicationMsg)
+ public void addCreateSequenceMessage(RMMsgContext applicationRMMsg)
throws SandeshaException {
- RMMsgContext createSeqMessage = RMMsgCreator
- .createCreateSeqMsg(applicationMsg);
- AbstractContext context = applicationMsg.getContext();
+ MessageContext applicationMsg = applicationRMMsg.getMessageContext();
+ if (applicationMsg==null)
+ throw new SandeshaException ("Message context is null");
+ RMMsgContext createSeqRMMessage = RMMsgCreator
+ .createCreateSeqMsg(applicationRMMsg);
+ MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
+ AbstractContext context = applicationRMMsg.getContext();
if (context == null)
throw new SandeshaException("Context is null");
@@ -297,12 +258,12 @@
context).getStorageMapBeanMgr();
RetransmitterBeanMgr retransmitterMgr = AbstractBeanMgrFactory
.getInstance(context).getRetransmitterBeanMgr();
- String key = SandeshaUtil.storeMessageContext(createSeqMessage
+ String key = SandeshaUtil.storeMessageContext(createSeqRMMessage
.getMessageContext());
RetransmitterBean createSeqEntry = new RetransmitterBean();
createSeqEntry.setKey(key);
createSeqEntry.setLastSentTime(0);
- createSeqEntry.setMessageId(createSeqMessage.getMessageId());
+ createSeqEntry.setMessageId(createSeqRMMessage.getMessageId());
createSeqEntry.setSend(true);
retransmitterMgr.insert(createSeqEntry);
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java Mon Sep 26 19:57:56 2005
@@ -108,6 +108,26 @@
return beans;
}
+
+ public Collection findMsgsToSend () {
+ ArrayList beans = new ArrayList();
+ Iterator iterator = table.values().iterator();
+
+ RetransmitterBean temp;
+ while (iterator.hasNext()) {
+ temp = (RetransmitterBean) iterator.next();
+ if (temp.isSend()) {
+ long lastSentTime = temp.getLastSentTime();
+ int count = temp.getSentCount();
+ long timeNow = System.currentTimeMillis();
+ if (count==0 || (timeNow > lastSentTime+Constants.WSP.RETRANSMISSION_INTERVAL)) {
+ beans.add(temp);
+ }
+ }
+ }
+
+ return beans;
+ }
public boolean update(RetransmitterBean bean) {
return table.put(bean.getMessageId(), bean) != null;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java Mon Sep 26 19:57:56 2005
@@ -34,6 +34,8 @@
private String tempSequenceId;
+ private int sentCount = 0;
+
public RetransmitterBean() {
}
@@ -54,55 +56,30 @@
return key;
}
- /**
- * @param key
- * The key to set.
- */
public void setKey(String key) {
this.key = key;
}
- /**
- * @return Returns the lastSentTime.
- */
public long getLastSentTime() {
return LastSentTime;
}
- /**
- * @param lastSentTime
- * The lastSentTime to set.
- */
public void setLastSentTime(long lastSentTime) {
LastSentTime = lastSentTime;
}
- /**
- * @return Returns the messageId.
- */
public String getMessageId() {
return messageId;
}
- /**
- * @param messageId
- * The messageId to set.
- */
public void setMessageId(String messageId) {
this.messageId = messageId;
}
- /**
- * @return Returns the send.
- */
public boolean isSend() {
return Send;
}
- /**
- * @param send
- * The send to set.
- */
public void setSend(boolean send) {
this.Send = send;
}
@@ -114,4 +91,13 @@
public void setTempSequenceId(String tempSequenceId) {
this.tempSequenceId = tempSequenceId;
}
+
+ public int getSentCount() {
+ return sentCount;
+ }
+
+ public void setSentCount(int sentCount) {
+ this.sentCount = sentCount;
+ }
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=291809&r1=291808&r2=291809&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Sep 26 19:57:56 2005
@@ -32,6 +32,7 @@
import org.apache.axis2.util.UUIDGenerator;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.Sender;
import org.apache.sandesha2.msgreceivers.RMMessageReceiver;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
@@ -43,7 +44,8 @@
public class SandeshaUtil {
private static Hashtable storedMsgContexts = new Hashtable();
-
+ private static Sender sender = new Sender ();
+
public static String getUUID() {
String uuid = "uuid:" + UUIDGenerator.getUUID();
return uuid;
@@ -166,7 +168,7 @@
//newMessageContext.setInFaultFlow(msgCtx.geti);
newMessageContext.setMessageID(getUUID());
newMessageContext.setMessageInformationHeaders(msgCtx.getMessageInformationHeaders());
- newMessageContext.setOperationContext(msgCtx.getOperationContext());
+ //newMessageContext.setOperationContext(msgCtx.getOperationContext());
newMessageContext.setOperationDescription(msgCtx.getOperationDescription());
newMessageContext.setOutPutWritten(msgCtx.isOutPutWritten());
newMessageContext.setParent(msgCtx.getParent());
@@ -176,11 +178,11 @@
newMessageContext.setResponseWritten(msgCtx.isResponseWritten());
newMessageContext.setRestThroughPOST(msgCtx.isRestThroughPOST());
newMessageContext.setServerSide(msgCtx.isServerSide());
- newMessageContext.setServiceContext(msgCtx.getServiceContext());
- newMessageContext.setServiceContextID(msgCtx.getServiceContextID());
+ //newMessageContext.setServiceContext(msgCtx.getServiceContext());
+ //newMessageContext.setServiceContextID(msgCtx.getServiceContextID());
newMessageContext.setServiceDescription(msgCtx.getServiceDescription());
- newMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
- newMessageContext.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
+ //newMessageContext.setServiceGroupContext(msgCtx.getServiceGroupContext());
+ //newMessageContext.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
if (msgCtx.getServiceGroupDescription()!=null)
newMessageContext.setServiceGroupDescription(msgCtx.getServiceGroupDescription());
newMessageContext.setSoapAction(msgCtx.getSoapAction());
@@ -210,5 +212,11 @@
newRMMsgCtx.setMessageContext(msgCtx);
return newRMMsgCtx;
+ }
+
+ public static void startSenderIfStopped (ConfigurationContext context) {
+ if (!sender.isSenderStarted()) {
+ sender.start(context);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org