You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2005/07/02 10:38:54 UTC
cvs commit: ws-sandesha/src/org/apache/sandesha/storage/queue IncomingSequence.java OutgoingSequence.java SandeshaQueue.java
jaliya 2005/07/02 01:38:54
Modified: src/org/apache/sandesha IStorageManager.java
src/org/apache/sandesha/client ClientStorageManager.java
src/org/apache/sandesha/server RMInvokerWorker.java
ServerStorageManager.java
src/org/apache/sandesha/storage/dao ISandeshaDAO.java
SandeshaDatabaseDAO.java SandeshaQueueDAO.java
src/org/apache/sandesha/storage/queue IncomingSequence.java
OutgoingSequence.java SandeshaQueue.java
Log:
Fixed a synchronization issue in the RMInvokerWorker
Revision Changes Path
1.29 +3 -1 ws-sandesha/src/org/apache/sandesha/IStorageManager.java
Index: IStorageManager.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/IStorageManager.java,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -r1.28 -r1.29
--- IStorageManager.java 2 Jul 2005 04:59:53 -0000 1.28
+++ IStorageManager.java 2 Jul 2005 08:38:53 -0000 1.29
@@ -31,7 +31,9 @@
boolean isResponseSequenceExist(String sequenceID);
- RMMessageContext getNextMessageToProcess();
+ Object getNextSeqToProcess();
+
+ RMMessageContext getNextMessageToProcess(Object seq);
void setAcknowledged(String seqID, long msgNumber);
1.46 +8 -0 ws-sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
Index: ClientStorageManager.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -r1.45 -r1.46
--- ClientStorageManager.java 2 Jul 2005 04:59:54 -0000 1.45
+++ ClientStorageManager.java 2 Jul 2005 08:38:53 -0000 1.46
@@ -56,6 +56,14 @@
return accessor.isIncomingSequenceExists(sequenceID);
}
+ public Object getNextSeqToProcess() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public RMMessageContext getNextMessageToProcess(Object seq) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
/**
* This will be used to inform the client about the presence of the response
* message. But will be impemented later.
1.6 +64 -62 ws-sandesha/src/org/apache/sandesha/server/RMInvokerWorker.java
Index: RMInvokerWorker.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/server/RMInvokerWorker.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- RMInvokerWorker.java 2 Jul 2005 06:06:32 -0000 1.5
+++ RMInvokerWorker.java 2 Jul 2005 08:38:54 -0000 1.6
@@ -40,7 +40,6 @@
private IStorageManager storageManager;
private static final Log log = LogFactory.getLog(RMInvokerWorker.class.getName());
private static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- private static Object lock = new Object();
public RMInvokerWorker() {
setStorageManager(new ServerStorageManager());
@@ -56,79 +55,26 @@
}
- public void run() {
+ public void run() {
while (true) {
- try {
- Thread.sleep(Constants.RMINVOKER_SLEEP_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- synchronized(lock){
try {
+ Thread.sleep(Constants.RMINVOKER_SLEEP_TIME);
- RMMessageContext rmMessageContext = getStorageManager().getNextMessageToProcess();
- if (rmMessageContext != null) {
- AddressingHeaders addrHeaders = rmMessageContext.getAddressingHeaders();
- boolean isVoid = doRealInvoke(rmMessageContext.getMsgContext());
-
- if (!isVoid) {
-
- String oldAction = rmMessageContext.getAddressingHeaders().getAction()
- .toString();
- rmMessageContext.getAddressingHeaders().setAction(oldAction + Constants.RESPONSE);
- if (rmMessageContext.isLastMessage()) {
- //Insert Terminate Sequnce.
- if (addrHeaders.getReplyTo() != null) {
- String replyTo = addrHeaders.getReplyTo().getAddress().toString();
- RMMessageContext terminateMsg = RMMessageCreator.createTerminateSeqMsg(rmMessageContext, Constants.SERVER);
- terminateMsg.setOutGoingAddress(replyTo);
- getStorageManager().insertTerminateSeqMessage(terminateMsg);
- } else {
- RMInvokerWorker.log.error(Constants.ErrorMessages.CANNOT_SEND_THE_TERMINATE_SEQ);
- }
- }
- //Store the message in the response queue. If there is an application
- // response then that response is always sent using a new HTTP connection
- // and the <replyTo> header is used in this case. This is done by the
- // RMSender.
- rmMessageContext.setMessageType(Constants.MSG_TYPE_SERVICE_RESPONSE);
-
- boolean hasResponseSeq = getStorageManager().isResponseSequenceExist(rmMessageContext.getSequenceID());
- boolean firstMsgOfResponseSeq = false;
- if (!(hasResponseSeq && rmMessageContext.getRMHeaders().getSequence()
- .getMessageNumber().getMessageNumber() == 1)) {
- firstMsgOfResponseSeq = !hasResponseSeq;
- }
-
- rmMessageContext.setMsgNumber(getStorageManager().getNextMessageNumber(rmMessageContext.getSequenceID()));
- getStorageManager().insertOutgoingMessage(rmMessageContext);
-
-
- if (firstMsgOfResponseSeq) {
- String msgIdStr = Constants.UUID + RMInvokerWorker.uuidGen.nextUUID();
-
- RMMessageContext csRMMsgCtx = RMMessageCreator.createCreateSeqMsg(rmMessageContext, Constants.SERVER, msgIdStr, null);
- csRMMsgCtx.setOutGoingAddress(rmMessageContext.getAddressingHeaders()
- .getReplyTo().getAddress().toString());
-
- csRMMsgCtx.addToMsgIdList(msgIdStr);
- csRMMsgCtx.setMessageID(msgIdStr);
-
- getStorageManager().setTemporaryOutSequence(csRMMsgCtx.getSequenceID(),
- msgIdStr);
- getStorageManager().addCreateSequenceRequest(csRMMsgCtx);
- }
- }
+ Object seq = getStorageManager().getNextSeqToProcess();
+ synchronized (seq) {
+ RMMessageContext rmMessageContext = getStorageManager().getNextMessageToProcess(seq);
+ doWork(rmMessageContext);
}
+
+
} catch (InterruptedException error) {
RMInvokerWorker.log.error(error);
} catch (Exception error) {
RMInvokerWorker.log.error(error);
}
}
- }
}
/**
@@ -144,4 +90,60 @@
protected IStorageManager getStorageManager() {
return storageManager;
}
+
+ protected void doWork(RMMessageContext rmMessageContext) throws Exception {
+ if (rmMessageContext != null) {
+ AddressingHeaders addrHeaders = rmMessageContext.getAddressingHeaders();
+ boolean isVoid = doRealInvoke(rmMessageContext.getMsgContext());
+
+ if (!isVoid) {
+
+ String oldAction = rmMessageContext.getAddressingHeaders().getAction()
+ .toString();
+ rmMessageContext.getAddressingHeaders().setAction(oldAction + Constants.RESPONSE);
+ if (rmMessageContext.isLastMessage()) {
+ //Insert Terminate Sequnce.
+ if (addrHeaders.getReplyTo() != null) {
+ String replyTo = addrHeaders.getReplyTo().getAddress().toString();
+ RMMessageContext terminateMsg = RMMessageCreator.createTerminateSeqMsg(rmMessageContext, Constants.SERVER);
+ terminateMsg.setOutGoingAddress(replyTo);
+ getStorageManager().insertTerminateSeqMessage(terminateMsg);
+ } else {
+ RMInvokerWorker.log.error(Constants.ErrorMessages.CANNOT_SEND_THE_TERMINATE_SEQ);
+ }
+ }
+ //Store the message in the response queue. If there is an application
+ // response then that response is always sent using a new HTTP connection
+ // and the <replyTo> header is used in this case. This is done by the
+ // RMSender.
+ rmMessageContext.setMessageType(Constants.MSG_TYPE_SERVICE_RESPONSE);
+
+ boolean hasResponseSeq = getStorageManager().isResponseSequenceExist(rmMessageContext.getSequenceID());
+ boolean firstMsgOfResponseSeq = false;
+ if (!(hasResponseSeq && rmMessageContext.getRMHeaders().getSequence()
+ .getMessageNumber().getMessageNumber() == 1)) {
+ firstMsgOfResponseSeq = !hasResponseSeq;
+ }
+
+ rmMessageContext.setMsgNumber(getStorageManager().getNextMessageNumber(rmMessageContext.getSequenceID()));
+ getStorageManager().insertOutgoingMessage(rmMessageContext);
+
+
+ if (firstMsgOfResponseSeq) {
+ String msgIdStr = Constants.UUID + RMInvokerWorker.uuidGen.nextUUID();
+
+ RMMessageContext csRMMsgCtx = RMMessageCreator.createCreateSeqMsg(rmMessageContext, Constants.SERVER, msgIdStr, null);
+ csRMMsgCtx.setOutGoingAddress(rmMessageContext.getAddressingHeaders()
+ .getReplyTo().getAddress().toString());
+
+ csRMMsgCtx.addToMsgIdList(msgIdStr);
+ csRMMsgCtx.setMessageID(msgIdStr);
+
+ getStorageManager().setTemporaryOutSequence(csRMMsgCtx.getSequenceID(),
+ msgIdStr);
+ getStorageManager().addCreateSequenceRequest(csRMMsgCtx);
+ }
+ }
+ }
+ }
}
1.37 +15 -9 ws-sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -r1.36 -r1.37
--- ServerStorageManager.java 8 Jun 2005 08:58:32 -0000 1.36
+++ ServerStorageManager.java 2 Jul 2005 08:38:54 -0000 1.37
@@ -58,6 +58,7 @@
Constants.SERVER);
}
+
/**
* A very important method. Makes life easy for the thread or thread pool
* that is using this. Every thread just have to create an instance of
@@ -66,19 +67,19 @@
* same sequence id. But if that doesnt hv processable messages it will go for
* a new sequence.
*/
- public RMMessageContext getNextMessageToProcess() {
- if (tempSeqId == null)
- tempSeqId = accessor.getRandomSeqIdToProcess();
+ public RMMessageContext getNextMessageToProcess(Object seq) {
+// if (tempSeqId == null)
+// tempSeqId = accessor.getRandomSeqIdToProcess();
- if (tempSeqId == null)
+ if (seq == null)
return null;
- RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
+ RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(seq);
- if (nextMsg == null) {
- tempSeqId = accessor.getRandomSeqIdToProcess();
- nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
- }
+// if (nextMsg == null) {
+// tempSeqId = accessor.getRandomSeqIdToProcess();
+// nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
+// }
return nextMsg;
}
@@ -106,6 +107,11 @@
return accessor.isOutgoingSequenceExists(sequenceID);
}
+ public Object getNextSeqToProcess() {
+ return accessor.getRandomSeqToProcess();
+ }
+
+
/**
* This is used to get a random message from the out queue Basically server
* sender will use this.
1.18 +3 -1 ws-sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
Index: ISandeshaDAO.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- ISandeshaDAO.java 2 Jul 2005 04:59:55 -0000 1.17
+++ ISandeshaDAO.java 2 Jul 2005 08:38:54 -0000 1.18
@@ -96,7 +96,7 @@
* This tries to get the next message to be sent from the given outgoing sequence
* If these is no message to be sent in the given sequence, null will be returned.
*/
- RMMessageContext getNextMsgContextToProcess(String sequenceId);
+ RMMessageContext getNextMsgContextToProcess(Object seq);
/**
@@ -104,6 +104,8 @@
*/
RMMessageContext getNextOutgoingMsgContextToSend();
+ public Object getRandomSeqToProcess();
+
/**
* This is used to randomize the process of message sending. Otherwise messages of some
* sequence will not be sent while some other sequences will very easily be able to send all
1.17 +9 -5 ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
Index: SandeshaDatabaseDAO.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SandeshaDatabaseDAO.java 8 Jun 2005 08:58:32 -0000 1.16
+++ SandeshaDatabaseDAO.java 2 Jul 2005 08:38:54 -0000 1.17
@@ -85,11 +85,7 @@
return false;
}
- public RMMessageContext getNextMsgContextToProcess(String sequenceId) {
- // TODO Auto-generated method stub
- return null;
- }
-
+
public String getRandomSeqIdToProcess() {
// TODO Auto-generated method stub
return null;
@@ -105,6 +101,10 @@
return false;
}
+ public RMMessageContext getNextMsgContextToProcess(Object seq) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean addMessageToOutgoingSequence(String sequenceId,
RMMessageContext rmMessageContext) {
// TODO Auto-generated method stub
@@ -126,6 +126,10 @@
return null;
}
+ public Object getRandomSeqToProcess() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isOutgoingSequenceExists(String sequenceId) {
// TODO Auto-generated method stub
return false;
1.18 +16 -3 ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
Index: SandeshaQueueDAO.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- SandeshaQueueDAO.java 8 Jun 2005 08:58:32 -0000 1.17
+++ SandeshaQueueDAO.java 2 Jul 2005 08:38:54 -0000 1.18
@@ -97,11 +97,12 @@
return exists;
}
- public RMMessageContext getNextMsgContextToProcess(String sequenceId) {
+ public RMMessageContext getNextMsgContextToProcess(Object sequence) {
+
SandeshaQueue sq = SandeshaQueue.getInstance(endPoint);
RMMessageContext msg = null;
try {
- msg = sq.nextIncomingMessageToProcess(sequenceId);
+ msg = sq.nextIncomingMessageToProcess(sequence);
} catch (Exception e) {
SandeshaQueueDAO.log.error(e);
e.printStackTrace();
@@ -109,8 +110,20 @@
return msg;
}
+ public Object getRandomSeqToProcess(){
+ SandeshaQueue sq = SandeshaQueue.getInstance(endPoint);
+ Vector seqs= sq.nextAllSeqsToProcess();
+ int size = seqs.size();
+ if (size <= 0)
+ return null;
+ Random r = new Random();
+ int number = r.nextInt(size);
+
+ return seqs.get(number);
+ }
+
+
public String getRandomSeqIdToProcess() {
- // TODO Auto-generated method stub
SandeshaQueue sq = SandeshaQueue.getInstance(endPoint);
Vector ids = sq.nextAllSeqIdsToProcess();
int size = ids.size();
1.11 +1 -9 ws-sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java
Index: IncomingSequence.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- IncomingSequence.java 8 Jun 2005 08:58:32 -0000 1.10
+++ IncomingSequence.java 2 Jul 2005 08:38:54 -0000 1.11
@@ -31,11 +31,10 @@
* @author Jaliya Ekanayaka
*/
-public class IncomingSequence {
+public class IncomingSequence extends AbstractSequence{
private long lastProcessed;
private boolean hasProcessableMessages;
- private String sequenceId;
private HashMap hash;
private boolean beingProcessedLock = false; //When true messages are
private long lastMsgNo = -1;
@@ -89,13 +88,6 @@
return hasProcessableMessages;
}
- public String getSequenceId() {
- return sequenceId;
- }
-
- public void setSequenceId(String sequenceId) {
- this.sequenceId = sequenceId;
- }
/**
* adds the message to map. Also adds a record to cache if needed.
1.13 +1 -5 ws-sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java
Index: OutgoingSequence.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- OutgoingSequence.java 10 Jun 2005 11:46:37 -0000 1.12
+++ OutgoingSequence.java 2 Jul 2005 08:38:54 -0000 1.13
@@ -37,9 +37,8 @@
* This class works as a hash map for storing response messages until they are
* sent.
*/
-public class OutgoingSequence {
+public class OutgoingSequence extends AbstractSequence {
- private String sequenceId;
private String outSequenceId;
private boolean outSeqApproved;
private HashMap hash;
@@ -80,9 +79,6 @@
* public boolean hasMessagesToSend(){ return hasMessagesToSend; }
*/
- public String getSequenceId() {
- return sequenceId;
- }
public boolean isOutSeqApproved() {
return outSeqApproved;
1.28 +21 -3 ws-sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
Index: SandeshaQueue.java
===================================================================
RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -r1.27 -r1.28
--- SandeshaQueue.java 2 Jul 2005 04:59:56 -0000 1.27
+++ SandeshaQueue.java 2 Jul 2005 08:38:54 -0000 1.28
@@ -147,11 +147,13 @@
}
}
- public RMMessageContext nextIncomingMessageToProcess(String sequenceId) throws QueueException {
- if (sequenceId == null)
+ public RMMessageContext nextIncomingMessageToProcess(Object sequence) throws QueueException {
+ if (sequence == null)
return null;
- IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
+ AbstractSequence absSeq=(AbstractSequence)sequence;
+
+ IncomingSequence sh = (IncomingSequence) incomingMap.get(absSeq.getSequenceId());
synchronized (sh) {
if (sh == null)
throw new QueueException(Constants.Queue.SEQUENCE_ABSENT);
@@ -338,6 +340,22 @@
}
}
+ public Vector nextAllSeqsToProcess() {
+ Vector seqs = new Vector();
+
+ synchronized (incomingMap) {
+ Iterator it = incomingMap.keySet().iterator();
+
+ while (it.hasNext()) {
+ Object tempKey = it.next();
+ IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
+ if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
+ seqs.add(sh);
+ }
+ return seqs;
+ }
+ }
+
public Vector nextAllSeqIdsToProcess() {
Vector ids = new Vector();
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org