You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ja...@apache.org on 2004/09/07 12:55:13 UTC

cvs commit: ws-fx/sandesha/src/org/apache/sandesha/server/queue ServerQueue.java

jaliya      2004/09/07 03:55:13

  Added:       sandesha/src/org/apache/sandesha/server/queue
                        ServerQueue.java
  Log:
  Implementation of message queue in the Server side.
  
  Revision  Changes    Path
  1.1                  ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
  
  Index: ServerQueue.java
  ===================================================================
  /*
   * Copyright  1999-2004 The Apache Software Foundation.
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   *
   */
  
  package org.apache.sandesha.server.queue;
  import java.util.ArrayList;
  import java.util.Date;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Set;
  import java.util.Vector;
  
  import org.apache.log4j.Priority;
  import org.apache.sandesha.Constants;
  import org.apache.sandesha.RMMessage;
  import org.apache.sandesha.RMMessageContext;
  import org.apache.sandesha.RMSequence;
  import org.apache.sandesha.RMSequenceContext;
  import org.apache.sandesha.server.ReTransmissionProcessor;
  
  
  /*
   * Created on Aug 4, 2004 at 4:49:49 PM
   */
  
  /**
   * @author Chamikara Jayalath
   * @author Jaliya Ekanayaka
   */
  
  public class ServerQueue {
  
  	private static ServerQueue queue=null;
  
      HashMap sequenceMap;  //In comming messages.
      HashMap responseMap;  //Response messages
      ArrayList responseQueue;  // Acks and create seq. responses.
      
      HashMap responseQueueBin;   // Messaged processed from out queue will be moved to this.
      
  	private ServerQueue(){
  		sequenceMap = new HashMap();
  		responseMap = new HashMap();
          responseQueue = new ArrayList();
  		responseQueueBin = new HashMap();
  	}
  	
  	public static ServerQueue getInstance() {
  		if(queue==null){
  			queue = new ServerQueue();
  		}
  		return queue;		
  	}
  	
  	//This is wrong. No sequence context object is inside queue.
  	//Only sequence id and messages (in sequence hash).
  	/*public void addRMSequenceContext(RMSequenceContext seq) throws QueueException{
  
  		if(seq==null || seq.getSequenceID()==null || seq.getSequenceID().equals(""))
  			throw new QueueException("Invalid Sequence");
  					
  
          String seqId = seq.getSequenceID();
          createNewSequence(seqId);
  
  	}*/
  	
  	//This is wrong. No sequence context object is inside queue.
  	//Only sequence id and messages (in sequence hash).
  	/*public RMSequenceContext getRMSequenceContext(String sequenceID){
  	     if(sequenceID==null)
  	         return null;
  	     
  	     Object obj =  sequenceMap.get(sequenceID);
  	     if(obj!=null && (obj instanceof RMSequenceContext))   
  	         return (RMSequenceContext) sequenceMap.get(sequenceID); 
  	      else 
  	         return null;    
  	}*/
  	
  	
  	/**
  	 * This will not replace messages automatically.
  	 */
  	public boolean addMessageToSequence(String seqId,Long messageNo,RMMessageContext msgCon) throws QueueException{
  	    boolean successful = false;
  	    
  		if(seqId==null || msgCon==null)
  		    throw new QueueException("Error in adding message");
  		 
  		    
          
          if(isSequenceExists(seqId)){
          	SequenceHash seqHash = (SequenceHash) sequenceMap.get(seqId);
          	
          	synchronized (seqHash) {
       
          		if(seqHash==null)
          	    	throw new QueueException("Inconsistent queue");
         
                  if(seqHash.hasMessage(messageNo))
                      throw new QueueException("Message already exists");
                       //Messages will not be replaced automatically.
                      
       	  		seqHash.putNewMessage(messageNo,msgCon);
          	}
          }
          
          return successful;
  	}
  
      /**
       * 
       */
  	public boolean addMessageToResponseSequence(String seqId,RMMessageContext msgCon) throws QueueException{
  		boolean successful = false;
  	    
  		if(seqId==null || msgCon==null)
  			throw new QueueException("Error in adding message");
  		
  		if(isResponseSequenceExists(seqId)){
  			ResponseSequenceHash resSeqHash = (ResponseSequenceHash) responseMap.get(seqId);
     	
  			synchronized (resSeqHash) {
       
  				if(resSeqHash==null)
  					throw new QueueException("Inconsistent queue");
  					             
  				resSeqHash.putNewMessage(msgCon);
  			}
  		}
          
  		return successful;
  	}
  		
  	public boolean messagePresentInSequence(String sequenceId,Long messageNo) throws QueueException{
  		
  		SequenceHash seqHash = (SequenceHash) sequenceMap.get(sequenceId);
  		
  		if(seqHash==null)
  		    throw new QueueException("Sequence not present"); 
  		    
  		synchronized (seqHash){
  			return seqHash.hasMessage(messageNo);
  		}
  	}
  	
  	public boolean isSequenceExists(String seqId){
  		
  		synchronized (sequenceMap){
  
  			return sequenceMap.containsKey(seqId);
  		}
  	}
  
  	public boolean isResponseSequenceExists(String resSeqId){
  		
  		synchronized (responseMap){
  
  			return responseMap.containsKey(resSeqId);
  		}
  	}	
  	
  	public String nextSequenceIdToProcess(){
  		
  		synchronized (sequenceMap){
  			
  			int count = sequenceMap.size();
  			Iterator it = sequenceMap.keySet().iterator();
  			SequenceHash sh = null;
  			String seqId = null;
  		
  			whileLoop:
  			while(it.hasNext()){
  				String tempSeqId = (String) it.next();
  			    sh = (SequenceHash) sequenceMap.get(tempSeqId);
  	     	    if(sh.hasProcessableMessages()){
  		  	  		seqId = tempSeqId;
  		    		break whileLoop;
  		    	}
  			}
  		
  			return seqId;
  		}
  	}
  	
  	public RMMessageContext nextMessageToProcess(String sequenceId) throws QueueException{
  		
  		if(sequenceId==null)
  		   return null;
  		    
  		SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
  		
  		synchronized (sh) {
  
              if(sh==null)
                  throw new QueueException("Sequence id doen not exist");
  			
  			if(!sh.hasProcessableMessages())
  			    return null;
  		    
       	   RMMessageContext msgCon = sh.getNextMessageToProcess();
          	return msgCon;
          
  		}
  	}
  
  	public RMMessageContext nextResponseMessageToSend() throws QueueException {
  		    
  			RMMessageContext msg=null;
  		    		
  			synchronized (responseMap) {
  
  				Iterator it = responseMap.keySet().iterator();
  			
  				whileLoop:
  				while(it.hasNext()){
  					RMMessageContext tempMsg;
  					String tempKey = (String) it.next();
  
  					ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(tempKey);
  					if(rsh.isOutSeqApproved()){
  						tempMsg = rsh.getNextMessageToSend();
  						if(tempMsg!=null){
  							msg = tempMsg;
  						    break whileLoop;
  						}
  					}
  				}
  		}
  		return msg;
  	}
  		
  	public void createNewSequence(String sequenceId) throws QueueException {
  		if(sequenceId==null)
  		    throw new QueueException("Sequence Id is null");
  		    
  		synchronized (sequenceMap){
  
  			SequenceHash sh = new SequenceHash(sequenceId);
  			sequenceMap.put(sequenceId,sh);
  		}		
  	}
  
  	public void createNewResponseSequence(String sequenceId) throws QueueException {
  		if(sequenceId==null)
  			throw new QueueException("Sequence Id is null");
  		    
  		synchronized (responseMap){
  
  			ResponseSequenceHash rsh = new ResponseSequenceHash(sequenceId);
  			responseMap.put(sequenceId,rsh);
  		}		
  	}	
  	/**
  	 * Adds a new message to the responses queue.
  	 *
  	 */
  	public void addPriorityMessage(RMMessageContext msg) throws QueueException {
  	
  		synchronized (responseQueue){
  		
  			if(msg==null)
  			    throw new QueueException("Message is null");
  		
  			responseQueue.add(msg);
  		}
  	}
  	
  	
  	public RMMessageContext nextPriorityMessageToSend() throws QueueException{
  		
  		synchronized (responseQueue){
  			
  			if(responseQueue.size()<=0)
  			    return null;
  	
  			//RMMessageContext msg = (RMMessageContext) responseQueue.get(0);
  			RMMessageContext msg = null; 
  			int size = responseQueue.size();
  			
  			synchronized (responseQueue){
  				
  			
  				forLoop:   //Label
  				for(int i=0;i<size;i++){
  					RMMessageContext tempMsg = (RMMessageContext) responseQueue.get(i);
  					if(tempMsg!=null){
  						
  						switch (tempMsg.getMessageType()){
  							//Create seq messages will not be removed.
  							case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
  								long lastSentTime = tempMsg.getLastSentTime();
  								Date d = new Date();
  								long currentTime = d.getTime();
  								if(currentTime >= lastSentTime+ Constants.RETRANSMISSION_INTERVAL){
  								   tempMsg.setLastSentTime(currentTime);
  								   msg = tempMsg;
  								   break forLoop;
  								}
  								break;
  								
  							//Other msgs will be removed.
  							//These include CreareSeqResponses and Acknowledgements.	
  							default :
  							    responseQueue.remove(i);
  							    responseQueueBin.put(tempMsg.getMessageID(),tempMsg);
  							    msg = tempMsg;
  							    break forLoop;
  						}
  						
  					}
  				}
  			}
  			   
         		return msg;
         		
  		}
  	}
  	
  	/*public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
  		SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
  		if(sh==null)
  		    return null;
  		    
  		synchronized (sh) {    
  			if(!sh.hasNewMessages())  
  			    return null; 
  			    
  		    Long key = sh.    
  		} 
  	}*/
  	
  	public Vector nextAllMessagesToProcess(String sequenceId) throws QueueException{	
  		SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);
  	   
  	   synchronized (sh){
  	    	Vector v = sh.getNextMessagesToProcess();
  	    	return v;
  	   }
  	}
  
  
      //Folowing func. may cause errors. 
  	/*public Vector nextAllResponseMessagesToSend(String sequenceId) throws QueueException{	
  		ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);
  		Vector v = new Vector();
  		synchronized (rsh){ 
  			RMMessageContext msg = nextAllResponseMessagesToSend()
  			
  			while(msg!=null){
  				v.add(msg);
  				msg = rsh.getNextMessageToSend();
  			}
  			return v;
  		}
  	}*/
  		
  		
  	public Vector nextAllSeqIdsToProcess(){
  		Vector ids = new Vector();
  		
  		synchronized (sequenceMap){
  			Iterator it = sequenceMap.keySet().iterator();
  		
  			while(it.hasNext()){
  				Object tempKey = it.next();
  				SequenceHash sh = (SequenceHash) sequenceMap.get(tempKey);
  				if(sh.hasProcessableMessages() && !sh.isSequenceLocked())
  					ids.add(sh.getSequenceId());
  			}
  			return ids;
  		}
  	} 
  	
  	/*public Vector nextAllResponseSeqIdsToSend(){
  		Vector ids = new Vector();
  		
  		synchronized (responseMap){
  			Iterator it = responseMap.keySet().iterator();
  		
  			while(it.hasNext()){
  				Object tempKey = it.next();
  				ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(tempKey);
  				if(sh.hasProcessableMessages())
  					ids.add(sh.getSequenceId());
  			}
  		}
  		return ids;
  	}*/
  	
  	public void clear(boolean yes){
  		if(!yes)
  		    return;
  		    
  		sequenceMap.clear();
  		responseQueue.clear();    
  		responseMap.clear();
          responseQueueBin.clear();
  	}
  	
  	public void removeAllMsgsFromSeqence(String seqId,boolean yes){
  		if(!yes)
  			return;
  			
  		SequenceHash sh = (SequenceHash) sequenceMap.get(seqId);	
  	    sh.clearSequence(yes);
  	}
  
  	public void removeAllMsgsFromResponseSeqence(String seqId,boolean yes){
  		if(!yes)
  			return;
  			
  		ResponseSequenceHash sh = (ResponseSequenceHash) responseMap.get(seqId);	
  		sh.clearSequence(yes);
  	}
  		
  	public void removeSequence(String sequenceId,boolean yes){
  		if(!yes)
  		    return;
  		    
  		sequenceMap.remove(sequenceId);    
  	}
  
  	public void removeResponseSequence(String sequenceId,boolean yes){
  		if(!yes)
  			return;
  		
  		synchronized (responseMap) {    
  			responseMap.remove(sequenceId);    
  		}
  	}
  		
  	public void setSequenceLock(String sequenceId,boolean lock){
  		SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);	
  		sh.setProcessLock(lock);		
  	}
  	
  	public Set getAllReceivedMsgNumsOfSeq(String sequenceId){
  		Vector v = new Vector();
  		SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);	
  		return sh.getAllKeys();
  	}
  
  	public Set getAllReceivedMsgNumsOfResponseSeq(String sequenceId){
  		Vector v = new Vector();
  		ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);	
  		synchronized (rsh) {
  			return rsh.getAllKeys();
  		}
  	}
  		
  	public boolean isMessageExists(String sequenceId,Long messageNo){
  		SequenceHash sh = (SequenceHash) sequenceMap.get(sequenceId);	
  		//sh can be null if there are no messages at the initial point.
  		if(sh!=null)
  		return sh.hasMessage(messageNo);
  		else
  		return false;
  	}
  	
  	public void setOutSequence(String seqId,String outSeqId){
  		ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(seqId);	
  		
  		if(rsh==null){
  			System.out.println("ERROR: RESPONSE SEQ IS NULL");
  			return;
  		}
  		
  		synchronized (rsh) {
  			rsh.setOutSequenceId(outSeqId);
  		}		
  	}
  	
  	public void setOutSequenceApproved(String seqId,boolean approved){
  		ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(seqId);	
  		
  		if(rsh==null){
  			System.out.println("ERROR: RESPONSE SEQ IS NULL");
  			return;
  		}
  		synchronized (rsh) {
  			rsh.setOutSeqApproved(approved);
  		}	
  	}
  	
  	public String getSequenceOfOutSequence(String outSequence){
  		if(outSequence==null)
  		    return null;
  		    
  		Iterator it = responseMap.keySet().iterator();
  		synchronized (responseMap){
  			while(it.hasNext()){
  				String tempSeqId = (String) it.next();
  				ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(tempSeqId);	
  				String tempOutSequence = rsh.getOutSequenceId();
  				
  				if(outSequence.equals(tempOutSequence))
  					return tempSeqId;
  			
  			}
  		 }
  		 return null;
  	}
  	
  	
  	public void displayResponseMap(){
  		Iterator it = responseMap.keySet().iterator();
  		System.out.println("***************************************");
  		System.out.println("    DISPLAYING RESPONSE MAP");
  		System.out.println("    -----------------------");
  		while(it.hasNext()){
  			String s = (String) it.next();
  			System.out.println("\n Sequence id - "+s);
  			ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(s);	
  			
              Iterator it1 = rsh.getAllKeys().iterator();
              while(it1.hasNext()){
              	Long l = (Long) it1.next();
              	String msgId = rsh.getMessageId(l);
              	System.out.println("* key -"+l.longValue()+"- MessageID -"+msgId+"-");
              } 
  		}
  		System.out.println("***************************************");
  	}
  	
  	public void displaySequenceMap(){
  		Iterator it = sequenceMap.keySet().iterator();
  		System.out.println("***************************************");
  		System.out.println("    DISPLAYING SEQUENCE MAP");
  		System.out.println("    -----------------------");
  		while(it.hasNext()){
  			String s = (String) it.next();
  			System.out.println("\n Sequence id - "+s);
  			SequenceHash sh = (SequenceHash) sequenceMap.get(s);	
  			
  			Iterator it1 = sh.getAllKeys().iterator();
  			while(it1.hasNext()){
  				Long l = (Long) it1.next();
  				String msgId = sh.getMessageId(l);
  				System.out.println("* key -"+l.longValue()+"- MessageID -"+msgId+"-");
  			} 
  		}
  		System.out.println("***************************************");
  	}	
  	
  	public void displayPriorityQueue(){
  
  		System.out.println("***************************************");
  		System.out.println("    DISPLAYING PRIORITY QUEUE");
  		System.out.println("    -------------------------");
  		
  		Iterator it = responseQueue.iterator();
  		while(it.hasNext()){
  			RMMessageContext msg = (RMMessageContext) it.next();
  			String id = msg.getMessageID();
  			int type = msg.getMessageType();
  			
  			System.out.println("Message "+id+"  Type "+type);
  		}
  		System.out.println("***************************************");		 
  	}
  	
  	public void moveResponseMsgToBin(String sequenceId,Long messageNo){
  		ResponseSequenceHash rsh = (ResponseSequenceHash) responseMap.get(sequenceId);	
  		
  		if(rsh==null){
  			System.out.println("ERROR: RESPONSE SEQ IS NULL");
  			return;
  		}		
  		
  		synchronized (rsh){
  			//Deleting retuns the deleted message.
  			RMMessageContext msg =  rsh.deleteMessage(messageNo);
  	    	String msgId = msg.getMessageID();
  	    
  	   	    //Add msg to bin if id isnt null.
  	   	    if(msgId!=null)
  	       		 responseQueueBin.put(msgId,msg);
  		
  		}
  	}
  	
  	public void movePriorityMsgToBin(String messageId){
  		synchronized (responseQueue){
  	
  			int size = responseQueue.size();
  			
  			for(int i=0;i<size;i++){
  				RMMessageContext msg = (RMMessageContext) responseQueue.get(i);
  				
  				if(msg.getMessageID().equals(messageId)){
  					responseQueue.remove(i);
  					responseQueueBin.put(messageId,msg);
  					return;
  				}			
  			}
  		}
  	}
  }