You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2005/07/02 10:38:54 UTC

cvs commit: ws-sandesha/src/org/apache/sandesha/storage/queue IncomingSequence.java OutgoingSequence.java SandeshaQueue.java

jaliya      2005/07/02 01:38:54

  Modified:    src/org/apache/sandesha IStorageManager.java
               src/org/apache/sandesha/client ClientStorageManager.java
               src/org/apache/sandesha/server RMInvokerWorker.java
                        ServerStorageManager.java
               src/org/apache/sandesha/storage/dao ISandeshaDAO.java
                        SandeshaDatabaseDAO.java SandeshaQueueDAO.java
               src/org/apache/sandesha/storage/queue IncomingSequence.java
                        OutgoingSequence.java SandeshaQueue.java
  Log:
  Fixed a synchronization issue in the RMInvokerWorker
  
  Revision  Changes    Path
  1.29      +3 -1      ws-sandesha/src/org/apache/sandesha/IStorageManager.java
  
  Index: IStorageManager.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/IStorageManager.java,v
  retrieving revision 1.28
  retrieving revision 1.29
  diff -u -r1.28 -r1.29
  --- IStorageManager.java	2 Jul 2005 04:59:53 -0000	1.28
  +++ IStorageManager.java	2 Jul 2005 08:38:53 -0000	1.29
  @@ -31,7 +31,9 @@
   
       boolean isResponseSequenceExist(String sequenceID);
   
  -    RMMessageContext getNextMessageToProcess();
  +    Object getNextSeqToProcess();
  +
  +    RMMessageContext getNextMessageToProcess(Object seq);
   
       void setAcknowledged(String seqID, long msgNumber);
   
  
  
  
  1.46      +8 -0      ws-sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
  
  Index: ClientStorageManager.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
  retrieving revision 1.45
  retrieving revision 1.46
  diff -u -r1.45 -r1.46
  --- ClientStorageManager.java	2 Jul 2005 04:59:54 -0000	1.45
  +++ ClientStorageManager.java	2 Jul 2005 08:38:53 -0000	1.46
  @@ -56,6 +56,14 @@
           return accessor.isIncomingSequenceExists(sequenceID);
       }
   
  +    public Object getNextSeqToProcess() {
  +        return null;  //To change body of implemented methods use File | Settings | File Templates.
  +    }
  +
  +    public RMMessageContext getNextMessageToProcess(Object seq) {
  +        return null;  //To change body of implemented methods use File | Settings | File Templates.
  +    }
  +
       /**
        * This will be used to inform the client about the presence of the response
        * message. But will be impemented later.
  
  
  
  1.6       +64 -62    ws-sandesha/src/org/apache/sandesha/server/RMInvokerWorker.java
  
  Index: RMInvokerWorker.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/server/RMInvokerWorker.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- RMInvokerWorker.java	2 Jul 2005 06:06:32 -0000	1.5
  +++ RMInvokerWorker.java	2 Jul 2005 08:38:54 -0000	1.6
  @@ -40,7 +40,6 @@
       private IStorageManager storageManager;
       private static final Log log = LogFactory.getLog(RMInvokerWorker.class.getName());
       private static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
  -    private static Object lock = new Object();
   
       public RMInvokerWorker() {
           setStorageManager(new ServerStorageManager());
  @@ -56,79 +55,26 @@
       }
   
   
  -    public  void run() {
  +    public void run() {
           while (true) {
  -            try {
  -                Thread.sleep(Constants.RMINVOKER_SLEEP_TIME);
  -            } catch (InterruptedException e) {
  -                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
  -            }
   
  -            synchronized(lock){
               try {
  +                Thread.sleep(Constants.RMINVOKER_SLEEP_TIME);
   
  -                RMMessageContext rmMessageContext = getStorageManager().getNextMessageToProcess();
   
  -                if (rmMessageContext != null) {
  -                    AddressingHeaders addrHeaders = rmMessageContext.getAddressingHeaders();
  -                    boolean isVoid = doRealInvoke(rmMessageContext.getMsgContext());
  -
  -                    if (!isVoid) {
  -
  -                        String oldAction = rmMessageContext.getAddressingHeaders().getAction()
  -                                .toString();
  -                        rmMessageContext.getAddressingHeaders().setAction(oldAction + Constants.RESPONSE);
  -                        if (rmMessageContext.isLastMessage()) {
  -                            //Insert Terminate Sequnce.
  -                            if (addrHeaders.getReplyTo() != null) {
  -                                String replyTo = addrHeaders.getReplyTo().getAddress().toString();
  -                                RMMessageContext terminateMsg = RMMessageCreator.createTerminateSeqMsg(rmMessageContext, Constants.SERVER);
  -                                terminateMsg.setOutGoingAddress(replyTo);
  -                                getStorageManager().insertTerminateSeqMessage(terminateMsg);
  -                            } else {
  -                                RMInvokerWorker.log.error(Constants.ErrorMessages.CANNOT_SEND_THE_TERMINATE_SEQ);
  -                            }
  -                        }
  -                        //Store the message in the response queue. If there is an application
  -                        // response then that response is always sent using a new HTTP connection
  -                        // and the <replyTo> header is used in this case. This is done by the
  -                        // RMSender.
  -                        rmMessageContext.setMessageType(Constants.MSG_TYPE_SERVICE_RESPONSE);
  -
  -                        boolean hasResponseSeq = getStorageManager().isResponseSequenceExist(rmMessageContext.getSequenceID());
  -                        boolean firstMsgOfResponseSeq = false;
  -                        if (!(hasResponseSeq && rmMessageContext.getRMHeaders().getSequence()
  -                                .getMessageNumber().getMessageNumber() == 1)) {
  -                            firstMsgOfResponseSeq = !hasResponseSeq;
  -                        }
  -
  -                        rmMessageContext.setMsgNumber(getStorageManager().getNextMessageNumber(rmMessageContext.getSequenceID()));
  -                        getStorageManager().insertOutgoingMessage(rmMessageContext);
  -
  -
  -                        if (firstMsgOfResponseSeq) {
  -                            String msgIdStr = Constants.UUID + RMInvokerWorker.uuidGen.nextUUID();
  -
  -                            RMMessageContext csRMMsgCtx = RMMessageCreator.createCreateSeqMsg(rmMessageContext, Constants.SERVER, msgIdStr, null);
  -                            csRMMsgCtx.setOutGoingAddress(rmMessageContext.getAddressingHeaders()
  -                                    .getReplyTo().getAddress().toString());
  -
  -                            csRMMsgCtx.addToMsgIdList(msgIdStr);
  -                            csRMMsgCtx.setMessageID(msgIdStr);
  -
  -                            getStorageManager().setTemporaryOutSequence(csRMMsgCtx.getSequenceID(),
  -                                    msgIdStr);
  -                            getStorageManager().addCreateSequenceRequest(csRMMsgCtx);
  -                        }
  -                    }
  +                Object seq = getStorageManager().getNextSeqToProcess();
  +                synchronized (seq) {
  +                    RMMessageContext rmMessageContext = getStorageManager().getNextMessageToProcess(seq);
  +                    doWork(rmMessageContext);
                   }
  +
  +
               } catch (InterruptedException error) {
                   RMInvokerWorker.log.error(error);
               } catch (Exception error) {
                   RMInvokerWorker.log.error(error);
               }
           }
  -        }
       }
   
       /**
  @@ -144,4 +90,60 @@
       protected IStorageManager getStorageManager() {
           return storageManager;
       }
  +
  +    protected void doWork(RMMessageContext rmMessageContext) throws Exception {
  +        if (rmMessageContext != null) {
  +            AddressingHeaders addrHeaders = rmMessageContext.getAddressingHeaders();
  +            boolean isVoid = doRealInvoke(rmMessageContext.getMsgContext());
  +
  +            if (!isVoid) {
  +
  +                String oldAction = rmMessageContext.getAddressingHeaders().getAction()
  +                        .toString();
  +                rmMessageContext.getAddressingHeaders().setAction(oldAction + Constants.RESPONSE);
  +                if (rmMessageContext.isLastMessage()) {
  +                    //Insert Terminate Sequnce.
  +                    if (addrHeaders.getReplyTo() != null) {
  +                        String replyTo = addrHeaders.getReplyTo().getAddress().toString();
  +                        RMMessageContext terminateMsg = RMMessageCreator.createTerminateSeqMsg(rmMessageContext, Constants.SERVER);
  +                        terminateMsg.setOutGoingAddress(replyTo);
  +                        getStorageManager().insertTerminateSeqMessage(terminateMsg);
  +                    } else {
  +                        RMInvokerWorker.log.error(Constants.ErrorMessages.CANNOT_SEND_THE_TERMINATE_SEQ);
  +                    }
  +                }
  +                //Store the message in the response queue. If there is an application
  +                // response then that response is always sent using a new HTTP connection
  +                // and the <replyTo> header is used in this case. This is done by the
  +                // RMSender.
  +                rmMessageContext.setMessageType(Constants.MSG_TYPE_SERVICE_RESPONSE);
  +
  +                boolean hasResponseSeq = getStorageManager().isResponseSequenceExist(rmMessageContext.getSequenceID());
  +                boolean firstMsgOfResponseSeq = false;
  +                if (!(hasResponseSeq && rmMessageContext.getRMHeaders().getSequence()
  +                        .getMessageNumber().getMessageNumber() == 1)) {
  +                    firstMsgOfResponseSeq = !hasResponseSeq;
  +                }
  +
  +                rmMessageContext.setMsgNumber(getStorageManager().getNextMessageNumber(rmMessageContext.getSequenceID()));
  +                getStorageManager().insertOutgoingMessage(rmMessageContext);
  +
  +
  +                if (firstMsgOfResponseSeq) {
  +                    String msgIdStr = Constants.UUID + RMInvokerWorker.uuidGen.nextUUID();
  +
  +                    RMMessageContext csRMMsgCtx = RMMessageCreator.createCreateSeqMsg(rmMessageContext, Constants.SERVER, msgIdStr, null);
  +                    csRMMsgCtx.setOutGoingAddress(rmMessageContext.getAddressingHeaders()
  +                            .getReplyTo().getAddress().toString());
  +
  +                    csRMMsgCtx.addToMsgIdList(msgIdStr);
  +                    csRMMsgCtx.setMessageID(msgIdStr);
  +
  +                    getStorageManager().setTemporaryOutSequence(csRMMsgCtx.getSequenceID(),
  +                            msgIdStr);
  +                    getStorageManager().addCreateSequenceRequest(csRMMsgCtx);
  +                }
  +            }
  +        }
  +    }
   }
  
  
  
  1.37      +15 -9     ws-sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
  
  Index: ServerStorageManager.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
  retrieving revision 1.36
  retrieving revision 1.37
  diff -u -r1.36 -r1.37
  --- ServerStorageManager.java	8 Jun 2005 08:58:32 -0000	1.36
  +++ ServerStorageManager.java	2 Jul 2005 08:38:54 -0000	1.37
  @@ -58,6 +58,7 @@
                           Constants.SERVER);
       }
   
  +
       /**
        * A very important method. Makes life easy for the thread or thread pool
        * that is using this. Every thread just have to create an instance of
  @@ -66,19 +67,19 @@
        * same sequence id. But if that doesnt hv processable messages it will go for
        * a new sequence.
        */
  -    public RMMessageContext getNextMessageToProcess() {
  -        if (tempSeqId == null)
  -            tempSeqId = accessor.getRandomSeqIdToProcess();
  + public RMMessageContext getNextMessageToProcess(Object seq) {
  +//        if (tempSeqId == null)
  +//            tempSeqId = accessor.getRandomSeqIdToProcess();
   
  -        if (tempSeqId == null)
  +        if (seq == null)
               return null;
   
  -        RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
  +        RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(seq);
   
  -        if (nextMsg == null) {
  -            tempSeqId = accessor.getRandomSeqIdToProcess();
  -            nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
  -        }
  +//        if (nextMsg == null) {
  +//            tempSeqId = accessor.getRandomSeqIdToProcess();
  +//            nextMsg = accessor.getNextMsgContextToProcess(tempSeqId);
  +//        }
   
           return nextMsg;
       }
  @@ -106,6 +107,11 @@
           return accessor.isOutgoingSequenceExists(sequenceID);
       }
   
  +    public Object getNextSeqToProcess() {
  +      return  accessor.getRandomSeqToProcess();
  +    }
  +
  +
       /**
        * This is used to get a random message from the out queue Basically server
        * sender will use this.
  
  
  
  1.18      +3 -1      ws-sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
  
  Index: ISandeshaDAO.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- ISandeshaDAO.java	2 Jul 2005 04:59:55 -0000	1.17
  +++ ISandeshaDAO.java	2 Jul 2005 08:38:54 -0000	1.18
  @@ -96,7 +96,7 @@
        * This tries to get the next message to be sent from the given outgoing sequence
        * If these is no message to be sent in the given sequence, null will be returned.
        */
  -    RMMessageContext getNextMsgContextToProcess(String sequenceId);
  +    RMMessageContext getNextMsgContextToProcess(Object seq);
   
   
       /**
  @@ -104,6 +104,8 @@
        */
       RMMessageContext getNextOutgoingMsgContextToSend();
   
  +    public Object getRandomSeqToProcess();
  +
       /**
        * This is used to randomize the process of message sending. Otherwise messages of some
        * sequence will not be sent while some other sequences will very easily be able to send all
  
  
  
  1.17      +9 -5      ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
  
  Index: SandeshaDatabaseDAO.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- SandeshaDatabaseDAO.java	8 Jun 2005 08:58:32 -0000	1.16
  +++ SandeshaDatabaseDAO.java	2 Jul 2005 08:38:54 -0000	1.17
  @@ -85,11 +85,7 @@
           return false;
       }
   
  -    public RMMessageContext getNextMsgContextToProcess(String sequenceId) {
  -        // TODO Auto-generated method stub
  -        return null;
  -    }
  -
  +  
       public String getRandomSeqIdToProcess() {
           // TODO Auto-generated method stub
           return null;
  @@ -105,6 +101,10 @@
           return false;
       }
   
  +    public RMMessageContext getNextMsgContextToProcess(Object seq) {
  +        return null;  //To change body of implemented methods use File | Settings | File Templates.
  +    }
  +
       public boolean addMessageToOutgoingSequence(String sequenceId,
                                                   RMMessageContext rmMessageContext) {
           // TODO Auto-generated method stub
  @@ -126,6 +126,10 @@
           return null;
       }
   
  +    public Object getRandomSeqToProcess() {
  +        return null;  //To change body of implemented methods use File | Settings | File Templates.
  +    }
  +
       public boolean isOutgoingSequenceExists(String sequenceId) {
           // TODO Auto-generated method stub
           return false;
  
  
  
  1.18      +16 -3     ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
  
  Index: SandeshaQueueDAO.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- SandeshaQueueDAO.java	8 Jun 2005 08:58:32 -0000	1.17
  +++ SandeshaQueueDAO.java	2 Jul 2005 08:38:54 -0000	1.18
  @@ -97,11 +97,12 @@
           return exists;
       }
   
  -    public RMMessageContext getNextMsgContextToProcess(String sequenceId) {
  +    public RMMessageContext getNextMsgContextToProcess(Object sequence) {
  +
           SandeshaQueue sq = SandeshaQueue.getInstance(endPoint);
           RMMessageContext msg = null;
           try {
  -            msg = sq.nextIncomingMessageToProcess(sequenceId);
  +            msg = sq.nextIncomingMessageToProcess(sequence);
           } catch (Exception e) {
               SandeshaQueueDAO.log.error(e);
               e.printStackTrace();
  @@ -109,8 +110,20 @@
           return msg;
       }
   
  +    public Object getRandomSeqToProcess(){
  +         SandeshaQueue sq = SandeshaQueue.getInstance(endPoint);
  +         Vector seqs= sq.nextAllSeqsToProcess();
  +         int size = seqs.size();
  +        if (size <= 0)
  +            return null;
  +        Random r = new Random();
  +        int number = r.nextInt(size);
  +
  +         return seqs.get(number);
  +    }
  +
  +
       public String getRandomSeqIdToProcess() {
  -        // TODO Auto-generated method stub
           SandeshaQueue sq = SandeshaQueue.getInstance(endPoint);
           Vector ids = sq.nextAllSeqIdsToProcess();
           int size = ids.size();
  
  
  
  1.11      +1 -9      ws-sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java
  
  Index: IncomingSequence.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- IncomingSequence.java	8 Jun 2005 08:58:32 -0000	1.10
  +++ IncomingSequence.java	2 Jul 2005 08:38:54 -0000	1.11
  @@ -31,11 +31,10 @@
    * @author Jaliya Ekanayaka
    */
   
  -public class IncomingSequence {
  +public class IncomingSequence extends AbstractSequence{
   
       private long lastProcessed;
       private boolean hasProcessableMessages;
  -    private String sequenceId;
       private HashMap hash;
       private boolean beingProcessedLock = false; //When true messages are
       private long lastMsgNo = -1;
  @@ -89,13 +88,6 @@
           return hasProcessableMessages;
       }
   
  -    public String getSequenceId() {
  -        return sequenceId;
  -    }
  -
  -    public void setSequenceId(String sequenceId) {
  -        this.sequenceId = sequenceId;
  -    }
   
       /**
        * adds the message to map. Also adds a record to cache if needed.
  
  
  
  1.13      +1 -5      ws-sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java
  
  Index: OutgoingSequence.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- OutgoingSequence.java	10 Jun 2005 11:46:37 -0000	1.12
  +++ OutgoingSequence.java	2 Jul 2005 08:38:54 -0000	1.13
  @@ -37,9 +37,8 @@
    * This class works as a hash map for storing response messages until they are
    * sent.
    */
  -public class OutgoingSequence {
  +public class OutgoingSequence extends AbstractSequence {
   
  -    private String sequenceId;
       private String outSequenceId;
       private boolean outSeqApproved;
       private HashMap hash;
  @@ -80,9 +79,6 @@
        * public boolean hasMessagesToSend(){ return hasMessagesToSend; }
        */
   
  -    public String getSequenceId() {
  -        return sequenceId;
  -    }
   
       public boolean isOutSeqApproved() {
           return outSeqApproved;
  
  
  
  1.28      +21 -3     ws-sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
  
  Index: SandeshaQueue.java
  ===================================================================
  RCS file: /home/cvs/ws-sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
  retrieving revision 1.27
  retrieving revision 1.28
  diff -u -r1.27 -r1.28
  --- SandeshaQueue.java	2 Jul 2005 04:59:56 -0000	1.27
  +++ SandeshaQueue.java	2 Jul 2005 08:38:54 -0000	1.28
  @@ -147,11 +147,13 @@
           }
       }
   
  -    public RMMessageContext nextIncomingMessageToProcess(String sequenceId) throws QueueException {
  -        if (sequenceId == null)
  +    public RMMessageContext nextIncomingMessageToProcess(Object sequence) throws QueueException {
  +        if (sequence == null)
               return null;
   
  -        IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
  +        AbstractSequence absSeq=(AbstractSequence)sequence;
  +
  +       IncomingSequence sh = (IncomingSequence) incomingMap.get(absSeq.getSequenceId());
           synchronized (sh) {
               if (sh == null)
                   throw new QueueException(Constants.Queue.SEQUENCE_ABSENT);
  @@ -338,6 +340,22 @@
           }
       }
   
  +       public Vector nextAllSeqsToProcess() {
  +           Vector seqs = new Vector();
  +
  +        synchronized (incomingMap) {
  +            Iterator it = incomingMap.keySet().iterator();
  +
  +            while (it.hasNext()) {
  +                Object tempKey = it.next();
  +                IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
  +                if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
  +                    seqs.add(sh);
  +            }
  +            return seqs;
  +        }
  +       }
  +
       public Vector nextAllSeqIdsToProcess() {
           Vector ids = new Vector();
   
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org