You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/10/20 12:32:18 UTC

cvs commit: ws-fx/sandesha/src/org/apache/sandesha/server/queue SequenceHash.java ServerQueue.java

jaliya      2004/10/20 03:32:18

  Modified:    sandesha/src/org/apache/sandesha/server/queue
                        SequenceHash.java ServerQueue.java
  Log:
  Fixed the bugs and formatted the code
  
  Revision  Changes    Path
  1.3       +4 -3      ws-fx/sandesha/src/org/apache/sandesha/server/queue/SequenceHash.java
  
  Index: SequenceHash.java
  ===================================================================
  RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/SequenceHash.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SequenceHash.java	13 Sep 2004 11:26:51 -0000	1.2
  +++ SequenceHash.java	20 Oct 2004 10:32:18 -0000	1.3
  @@ -44,7 +44,8 @@
       private HashMap hash;
   
       private boolean beingProcessedLock = false; //When true messages are
  -                                                // currently being processed.
  +
  +    // currently being processed.
   
       public SequenceHash(String sequenceId) {
           lastProcessed = 0;
  @@ -105,7 +106,7 @@
               refreshHasProcessableMessages();
           } else {
               setProcessLock(false); // Not a must. (det done in
  -                                   // refreshHasProcessableMessages();)
  +            // refreshHasProcessableMessages();)
           }
   
           return msg;
  @@ -142,7 +143,7 @@
           hasProcessableMessages = hash.containsKey(nextKey);
   
           if (!hasProcessableMessages) //Cant be being procesed if no messages to
  -                                     // process.
  +            // process.
               setProcessLock(false);
       }
   
  
  
  
  1.5       +65 -86    ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
  
  Index: ServerQueue.java
  ===================================================================
  RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ServerQueue.java	9 Oct 2004 08:50:23 -0000	1.4
  +++ ServerQueue.java	20 Oct 2004 10:32:18 -0000	1.5
  @@ -47,7 +47,8 @@
       ArrayList priorityQueue; // Acks and create seq. responses.
   
       HashMap queueBin; // Messaged processed from out queue will be moved
  -                              // to this.
  +
  +    // to this.
   
       private ServerQueue() {
           incomingMap = new HashMap();
  @@ -63,32 +64,6 @@
           return queue;
       }
   
  -    //This is wrong. No sequence context object is inside queue.
  -    //Only sequence id and messages (in sequence hash).
  -    /*
  -     * public void addRMSequenceContext(RMSequenceContext seq) throws
  -     * QueueException{
  -     * 
  -     * if(seq==null || seq.getSequenceID()==null ||
  -     * seq.getSequenceID().equals("")) throw new QueueException("Invalid
  -     * Sequence");
  -     * 
  -     * 
  -     * String seqId = seq.getSequenceID(); createNewSequence(seqId);
  -     *  }
  -     */
  -
  -    //This is wrong. No sequence context object is inside queue.
  -    //Only sequence id and messages (in sequence hash).
  -    /*
  -     * public RMSequenceContext getRMSequenceContext(String sequenceID){
  -     * if(sequenceID==null) return null;
  -     * 
  -     * Object obj = incomingMap.get(sequenceID); if(obj!=null && (obj instanceof
  -     * RMSequenceContext)) return (RMSequenceContext)
  -     * incomingMap.get(sequenceID); else return null; }
  -     */
  -
       /**
        * This will not replace messages automatically.
        */
  @@ -136,7 +111,6 @@
   
                   if (resSeqHash == null)
                       throw new QueueException("Inconsistent queue");
  -
                   resSeqHash.putNewMessage(msgCon);
               }
           }
  @@ -144,8 +118,8 @@
           return successful;
       }
   
  -    public boolean messagePresentInIncomingSequence(String sequenceId, Long messageNo)
  -            throws QueueException {
  +    public boolean messagePresentInIncomingSequence(String sequenceId,
  +            Long messageNo) throws QueueException {
   
           SequenceHash seqHash = (SequenceHash) incomingMap.get(sequenceId);
   
  @@ -160,7 +134,7 @@
       public boolean isIncomingSequenceExists(String seqId) {
   
           synchronized (incomingMap) {
  -            
  +
               return incomingMap.containsKey(seqId);
           }
       }
  @@ -228,7 +202,6 @@
               whileLoop: while (it.hasNext()) {
                   RMMessageContext tempMsg;
                   String tempKey = (String) it.next();
  -
                   ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                           .get(tempKey);
                   if (rsh.isOutSeqApproved()) {
  @@ -245,7 +218,8 @@
           return msg;
       }
   
  -    public void createNewIncomingSequence(String sequenceId) throws QueueException {
  +    public void createNewIncomingSequence(String sequenceId)
  +            throws QueueException {
           if (sequenceId == null)
               throw new QueueException("Sequence Id is null");
   
  @@ -322,8 +296,7 @@
                           // Acknowledgements.
                           default:
                               priorityQueue.remove(i);
  -                            queueBin.put(tempMsg.getMessageID(),
  -                                    tempMsg);
  +                            queueBin.put(tempMsg.getMessageID(), tempMsg);
                               msg = tempMsg;
                               break forLoop;
                           }
  @@ -472,7 +445,6 @@
                   .get(seqId);
   
           if (rsh == null) {
  -            System.out.println("ERROR: RESPONSE SEQ IS NULL");
               return;
           }
   
  @@ -486,7 +458,6 @@
                   .get(seqId);
   
           if (rsh == null) {
  -            System.out.println("ERROR: RESPONSE SEQ IS NULL");
               return;
           }
           synchronized (rsh) {
  @@ -495,20 +466,23 @@
       }
   
       public String getSequenceOfOutSequence(String outSequence) {
  -        if (outSequence == null){
  -             return null;
  +
  +        if (outSequence == null) {
  +            return null;
           }
  +
  +        //Client will always handle a single seq
  +        //if(outSequence==Constants.CLIENT_DEFAULD_SEQUENCE_ID)
  +        //    return outSequence;
  +
           Iterator it = outgoingMap.keySet().iterator();
           synchronized (outgoingMap) {
               while (it.hasNext()) {
  -                
  -              
  +
                   String tempSeqId = (String) it.next();
  -               
                   ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                           .get(tempSeqId);
                   String tempOutSequence = rsh.getOutSequenceId();
  -              
                   if (outSequence.equals(tempOutSequence))
                       return tempSeqId;
   
  @@ -519,9 +493,9 @@
   
       public void displayOutgoingMap() {
           Iterator it = outgoingMap.keySet().iterator();
  -        System.out.println("***************************************");
  -        System.out.println("    DISPLAYING RESPONSE MAP");
  -        System.out.println("    -----------------------");
  +        System.out.println("------------------------------------");
  +        System.out.println("      DISPLAYING RESPONSE MAP");
  +        System.out.println("------------------------------------");
           while (it.hasNext()) {
               String s = (String) it.next();
               System.out.println("\n Sequence id - " + s);
  @@ -536,14 +510,14 @@
                           + msgId + "-");
               }
           }
  -        System.out.println("***************************************");
  +        System.out.println("\n");
       }
   
       public void displayIncomingMap() {
           Iterator it = incomingMap.keySet().iterator();
  -        System.out.println("***************************************");
  -        System.out.println("    DISPLAYING SEQUENCE MAP");
  -        System.out.println("    -----------------------");
  +        System.out.println("------------------------------------");
  +        System.out.println("       DISPLAYING SEQUENCE MAP");
  +        System.out.println("------------------------------------");
           while (it.hasNext()) {
               String s = (String) it.next();
               System.out.println("\n Sequence id - " + s);
  @@ -557,14 +531,14 @@
                           + msgId + "-");
               }
           }
  -        System.out.println("***************************************");
  +        System.out.println("\n");
       }
   
       public void displayPriorityQueue() {
   
  -        System.out.println("***************************************");
  -        System.out.println("    DISPLAYING PRIORITY QUEUE");
  -        System.out.println("    -------------------------");
  +        System.out.println("------------------------------------");
  +        System.out.println("       DISPLAYING PRIORITY QUEUE");
  +        System.out.println("------------------------------------");
   
           Iterator it = priorityQueue.iterator();
           while (it.hasNext()) {
  @@ -574,41 +548,45 @@
   
               System.out.println("Message " + id + "  Type " + type);
           }
  -        System.out.println("***************************************");
  +        System.out.println("\n");
       }
   
       public void moveOutgoingMsgToBin(String sequenceId, Long messageNo) {
  -        String sequence =  getSequenceOfOutSequence(sequenceId);
  +        String sequence = getSequenceOfOutSequence(sequenceId);
           ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                   .get(sequence);
  -       
  -       
  +
           if (rsh == null) {
  -            System.out.println("ERROR: RESPONSE SEQ IS NULL");
  +            System.out.println("ERROR: Response sequence is NULL " + sequence);
               return;
           }
   
           synchronized (rsh) {
               //Deleting retuns the deleted message.
               RMMessageContext msg = rsh.deleteMessage(messageNo);
  -            String msgId = msg.getMessageID();
  +            //If we jave already deleted then no message to return.
  +            if (msg != null) {
   
  -            //Add msg to bin if id isnt null.
  -            if (msgId != null)
  -                queueBin.put(msgId, msg);
  +                String msgId = msg.getMessageID();
  +                System.out
  +                        .println("INFO: Moving out going messages to bin : msgId "
  +                                + msgId);
  +                //Add msg to bin if id isnt null.
  +                if (msgId != null)
  +                    queueBin.put(msgId, msg);
  +            }
   
           }
       }
   
       public void movePriorityMsgToBin(String messageId) {
           synchronized (priorityQueue) {
  -
               int size = priorityQueue.size();
  -
               for (int i = 0; i < size; i++) {
                   RMMessageContext msg = (RMMessageContext) priorityQueue.get(i);
   
                   if (msg.getMessageID().equals(messageId)) {
  +
                       priorityQueue.remove(i);
                       queueBin.put(messageId, msg);
                       return;
  @@ -616,27 +594,28 @@
               }
           }
       }
  -    
  -    public long getNextOutgoingMessageNumber (String seq){
  -		ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap.get(seq);	
  -		if(rsh==null)  //saquence not created yet.
  -			return 1;
  -			
  -        synchronized (rsh){
  -
  -        	    
  -        	Iterator keys = rsh.getAllKeys().iterator();
  -        	
  -        	long l = 0;
  -        	
  -        	while(keys.hasNext()){
  -        		long temp = ((Long) keys.next()).longValue();
  -        		if(temp>l)
  -        		    l = temp;    
  -        	}
  -        	
  -        	return (l);
  -        	
  +
  +    public long getNextOutgoingMessageNumber(String seq) {
  +        ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap.get(seq);
  +        if (rsh == null) { //saquence not created yet.
  +            return 1;
  +        }
  +
  +        synchronized (rsh) {
  +            Iterator keys = rsh.getAllKeys().iterator();
  +
  +            long msgNo = 0;
  +
  +            while (keys.hasNext()) {
  +                long temp = ((Long) keys.next()).longValue();
  +                if (temp > msgNo)
  +                    msgNo = temp;
  +
  +            }
  +            msgNo++;
  +
  +            return (msgNo);
  +
           }
       }
   }