You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/02/19 22:46:01 UTC

svn commit: r746009 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Author: cwiklik
Date: Thu Feb 19 21:46:00 2009
New Revision: 746009

URL: http://svn.apache.org/viewvc?rev=746009&view=rev
Log:
UIMA-1194 Added time waiting for free cas to the outgoing reply. Removed dead code. Removed code that handled messaging to collocated delegates.

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=746009&r1=746008&r2=746009&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Feb 19 21:46:00 2009
@@ -263,12 +263,6 @@
 				bos.close();
 			}
 		}
-//		if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
-//		{
-//			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
-//	                "serializeCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_dump_serialized_cas__FINEST",
-//	                new Object[] {aCasReferenceId, serializedCas});
-//		}
 
 		LongNumericStatistic statistic;
 		if ( (statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",Monitor.TotalSerializeTime)) != null )
@@ -622,7 +616,7 @@
 			}
 			else
 			{
-				sendCasToCollocatedDelegate(true, aCasReferenceId, null, anEndpoint, true,0 );
+        // Not supported
 			}
 		}
 		catch( ServiceShutdownException e)
@@ -712,12 +706,7 @@
 			}
 			else
 			{
-		    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
-	                    new Object[] { anEndpoint.getEndpoint(), aNewCasReferenceId, sequence });
-		    }
-				sendCasToCollocatedDelegate(false, anInputCasReferenceId, aNewCasReferenceId, anEndpoint, false, sequence);
+        // Not supported
 			}
 		}
 		catch( ServiceShutdownException e)
@@ -758,26 +747,7 @@
 			}
 			else
 			{
-		    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-		      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_reply_with_sequence__FINE",
-	                    new Object[] { anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getCasSequence() });
-		    }
-	      if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController && getAnalysisEngineController().isCasMultiplier() )
-	      {
-	         if ( ((AggregateAnalysisEngineController)getAnalysisEngineController()).getMessageOrigin(entry.getInputCasReferenceId()) != null)
-	         {
-	           sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
-	         }
-	         else
-	         {
-	           sendCasToCollocatedDelegate(false, entry.getCasReferenceId(), null, anEndpoint, false, 0);
-	         }
-	      }
-	      else
-	      {
-           sendCasToCollocatedDelegate(false, entry.getInputCasReferenceId(), entry.getCasReferenceId(), anEndpoint, false, entry.getCasSequence());
-	      }
+        // Not supported
 			}
 		}
 		catch( ServiceShutdownException e)
@@ -887,14 +857,10 @@
           }
           sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
         }
-          
-			
-			
-			
 			}
 			else
 			{
-				sendCasToCollocatedDelegate(false, aCasReferenceId, null, anEndpoint, false, 0);
+			  // Not supported
 			}
 		}
 		catch( ServiceShutdownException e)
@@ -1181,34 +1147,6 @@
 			throw new AsynchAEException(e);
 		}
 	}
-  private byte[] getSerializedBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
-  {
-    CAS cas = null;
-    try
-    {
-      byte[] serializedCAS = null;
-      //  Using Cas reference Id retrieve CAS from the shared Cash
-      cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
-      ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
-        CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-        long t1 = getAnalysisEngineController().getCpuTime();
-        serializedCAS = uimaSerializer.serializeCasToBinary(cas);
-        long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1;
-        
-        getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);
-        
-        entry.incrementTimeToSerializeCAS(timeToSerializeCas);
-        casStats.incrementCasSerializationTime(timeToSerializeCas);
-        getAnalysisEngineController().getServicePerformance().
-          incrementCasSerializationTime(timeToSerializeCas);
-      return serializedCAS;
-    }
-    catch( Exception e)
-    {
-      throw new AsynchAEException(e);
-    }
-  }
-	
   private byte[] getBinaryCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception
   {
     try
@@ -1282,8 +1220,10 @@
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime());
 				aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime());
         aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
-				long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
-				aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
+        aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS,
+                getAnalysisEngineController().getServicePerformance().getTimeWaitingForCAS());
+        long iT =getAnalysisEngineController().getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
+        aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT );
 				String lookupKey = getAnalysisEngineController().getName();
 				long arrivalTime = getAnalysisEngineController().getTime( aCasReferenceId, lookupKey); //serviceInputEndpoint);
 				long timeInService = getAnalysisEngineController().getCpuTime()-arrivalTime;
@@ -1416,62 +1356,6 @@
 			aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
 		}
 	}
-	/**
-	 * @deprecated
-	 * @param aMessage
-	 * @param anEndpoint
-	 * @param isProcessRequest
-	 * @throws Exception
-	 */
-	private void populateHeaderWithContext( Message aMessage, Endpoint anEndpoint, boolean isProcessRequest ) throws Exception
-	{
-		if (anEndpoint.isRemote())
-		{
-			aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
-			
-			if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController && isProcessRequest)
-			{
-				String protocol = serviceProtocolList; 
-				if ( anEndpoint.getServerURI().trim().toLowerCase().startsWith("http"))
-				{
-					protocol = extractURLWithProtocol(serviceProtocolList, "http");
-				}
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                    "populateHeaderWithContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_remote_FINE",
-	                    new Object[] {protocol, anEndpoint.getEndpoint()});
-        }
-				aMessage.setStringProperty(UIMAMessage.ServerURI, protocol);
-			}
-			else
-			{
-				aMessage.setStringProperty(UIMAMessage.ServerURI, serverURI);
-			}
-		}
-		else
-		{
-			aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
-			aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
-		}
-		if ( isProcessRequest )
-		{
-			aMessage.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process); 
-			if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController)
-			{
-				aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
-				aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); 
-
-			}
-			else
-			{
-				aMessage.setStringProperty(AsynchAEMessage.MessageFrom, serviceInputEndpoint);
-				aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Response); 
-			}
-			
-		}
-
-	}
-
 	public AnalysisEngineController getAnalysisEngineController()
 	{
 		return analysisEngineController;
@@ -1493,33 +1377,6 @@
 		this.controllerInputEndpoint = controllerInputEndpoint;
 	}
 
-	private void ackMessage(String aCasReferenceId) throws Exception
-	{
-		if ( getAnalysisEngineController().getInputChannel().getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE )
-		{
-			((Message)getAnalysisEngineController().
-					getInProcessCache().
-						getMessageAccessorByReference(aCasReferenceId).
-							getRawMessage()).acknowledge();
-
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                "ackMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_ack_msg__FINE",
-	                new Object[] { aCasReferenceId  });
-      }
-		}
-
-	}
-	private boolean requiresAck( Message aMessage, boolean isFinal, int ackMode) throws JMSException
-	{
-		if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response && 
-			 isFinal && ackMode == Session.CLIENT_ACKNOWLEDGE )
-		{
-			return true;
-		}
-		return false;
-	}
-	
 	private void dispatch( Message aMessage,  Endpoint anEndpoint, CacheEntry entry, boolean isRequest, JmsEndpointConnection_impl endpointConnection, long msgSize ) throws Exception {
 	  //  Add stats
     populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest);
@@ -1968,148 +1825,11 @@
 		return entry.getCasReferenceId();
 	}
 	
-	private boolean isProcessReply( Message aMessage )
-	{
-		try
-		{
-			if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response && 
-					 aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.Process )
-			{
-				return true;
-			}
-		}
-		catch( JMSException e) {}
-		return false;
-	}
 	private void addIdleTime( Message aMessage )
 	{
-
-/*
-		if ( isProcessReply(aMessage ) && 
-			 ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController || 
-			   !getAnalysisEngineController().isCasMultiplier() ) )
-
-*/
 		long t = System.nanoTime();
 		getAnalysisEngineController().saveReplyTime(t, "");
 	}
-	private void sendCasToCollocatedDelegate(boolean isRequest, String anInputCasReferenceId, String aNewCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence) 
-	throws AsynchAEException, ServiceShutdownException
-	{
-		try
-		{
-			//	Determine which of the CAS id's to log. If the CAS was generated by a CM its sequence
-			//	 will be > 0. In this instance use the id in 'aNewCASReferenceId'. Otherwise use the
-			//	the default which is the id of an Input CAS.
-			String id2Log = anInputCasReferenceId;
-			if ( sequence > 0 )
-			{
-				id2Log = aNewCasReferenceId;
-			}
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service__FINE",
-                    new Object[] {anEndpoint.getEndpoint(), id2Log});
-      }
-			//	Get the connection object for a given endpoint
-			JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-			//	Create empty JMS Text Message
-			TextMessage tm = endpointConnection.produceTextMessage("");
-			//	Add common properties to the JMS Header
-			if ( isRequest == true )
-			{
-				populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process); 
-			}
-			else
-			{
-				populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
-			}
-			
-			
-			tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.CASRefID); 
-			//	Add Cas Reference Id to the outgoing JMS Header
-			if ( aNewCasReferenceId != null )
-			{
-				tm.setStringProperty(AsynchAEMessage.CasReference, aNewCasReferenceId);
-/*
-				//	Add the initial Input Cas Reference Id. This is the top ancestor from
-				//	which all other CASes are produced
-				addTopCASParentReferenceId(tm, anInputCasReferenceId);
-*/
-				tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
-				tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
-				//	Override MessageType set in the populateHeaderWithContext above.
-				//	Add stats
-				populateStats(tm, anEndpoint, aNewCasReferenceId, AsynchAEMessage.Process, isRequest);
-				
-				//	Make the reply message look like a request. This message will contain a new CAS 
-				//	produced by the CAS Multiplier. The client will treat this CAS
-				//	differently from the input CAS. 
-				tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-				isRequest = true;
-
-				if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) )
-				{
-					CacheEntry cacheEntry = getCacheEntry(aNewCasReferenceId);
-					if ( cacheEntry != null )
-					{
-						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-			                    "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
-			                    new Object[] {getAnalysisEngineController().getComponentName(),"Collocated", anEndpoint.getEndpoint(), aNewCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() });
-					}
-				}
-
-			
-			}
-			else
-			{
-				tm.setStringProperty(AsynchAEMessage.CasReference, anInputCasReferenceId);
-				
-				if ( getAnalysisEngineController().getInProcessCache() != null &&
-						getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(anInputCasReferenceId).isAborted() )	
-				{
-					tm.setBooleanProperty(AsynchAEMessage.Aborted, true);
-				}
-
-				//	Add stats
-				populateStats(tm, anEndpoint, anInputCasReferenceId, AsynchAEMessage.Process, isRequest);
-				if ( startTimer )
-				{
-					//	Start a timer for this request. The amount of time to wait
-					//	for response is provided in configuration for the endpoint
-					anEndpoint.startProcessRequestTimer(anInputCasReferenceId);
-				}
-			}
-			// ----------------------------------------------------
-			//	Send Request Messsage to Delegate
-			// ----------------------------------------------------
-			endpointConnection.send(tm, 0, startTimer);
-			if ( !isRequest )
-			{
-				addIdleTime(tm);
-			}
-			
-		}
-		catch( JMSException e)
-		{
-			//	Unable to establish connection to the endpoint. Logit and continue
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-                    "sendCasToCollocatedDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
-                    new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()});
-      }
-		}
-		catch( ServiceShutdownException e)
-		{
-			throw e;
-		}
-		catch( Exception e)
-		{
-			throw new AsynchAEException(e);
-		}
-		
-	}
-
 	private CacheEntry getCacheEntry( String aCasReferenceId) throws Exception
 	{
 		CacheEntry cacheEntry = null;
@@ -2120,22 +1840,6 @@
 		}
 		return cacheEntry; 
 	}
-	private void addTopCASParentReferenceId( TextMessage tm, String aCasReferenceId) throws Exception
-	{
-		CacheEntry cacheEntry = null;
-		if ( ( cacheEntry = getCacheEntry(aCasReferenceId)) != null )
-		{
-			if ( cacheEntry.getInputCasReferenceId() == null )
-			{
-				tm.setStringProperty(AsynchAEMessage.InputCasReference, aCasReferenceId);
-			}
-			else
-			{
-				tm.setStringProperty(AsynchAEMessage.InputCasReference, cacheEntry.getInputCasReferenceId());
-			}
-		}
-		
-	}
 	public void stop()
 	{
 		aborting = true;