You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2008/06/02 23:07:17 UTC

svn commit: r662564 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima: adapter/jms/client/BaseMessageSender.java adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java jms/error/handler/JMSExceptionHandler.java

Author: schor
Date: Mon Jun  2 14:07:17 2008
New Revision: 662564

URL: http://svn.apache.org/viewvc?rev=662564&view=rev
Log:
[UIMA-1056] patch 2

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=662564&r1=662563&r2=662564&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Mon Jun  2 14:07:17 2008
@@ -119,6 +119,7 @@
 	/**
 	 * Signals any object that waits for the worker thread to initialize
 	 */
+
 	private void signal() {
 		synchronized (this) {
 			this.notifyAll();

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=662564&r1=662563&r2=662564&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Jun  2 14:07:17 2008
@@ -506,13 +506,11 @@
 	 * Sends a given CAS for analysis to the UIMA EE Service.
 	 * 
 	 */
-	private synchronized String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
+	private String sendCAS(CAS aCAS, ClientRequest requestToCache) throws ResourceProcessException
 	{
 		String casReferenceId = requestToCache.getCasReferenceId();
 		try
 		{
-//			waitUntilReadyToSendMessage(AsynchAEMessage.Process);
-
 			if (!running)
 			{
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_sending_cas_INFO", new Object[] { "Asynchronous Client is Stopping" });
@@ -550,7 +548,11 @@
 				pendingMessageList.add(msg);
 				pendingMessageList.notifyAll();
 			}
-			howManySent++;
+			
+			synchronized (cpcGate)
+			{
+				howManySent++;
+			}
 		}
 		catch (Exception e)
 		{
@@ -778,16 +780,9 @@
 		finally
 		{
 			removeFromCache(casReferenceId);
-			if (howManyRecvd == howManySent)
+			synchronized (cpcGate)
 			{
-				synchronized (cpcGate)
-				{
-					cpcGate.notifyAll();
-				}
-			}
-			if (howManyRecvd == howManySent)
-			{
-				synchronized (cpcGate)
+				if (howManyRecvd == howManySent)
 				{
 					cpcGate.notifyAll();
 				}
@@ -822,7 +817,7 @@
 		if ( doNotify )
 		{
 			ProcessTrace pt = new ProcessTrace_impl();
-			UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+			UimaASProcessStatusImpl status = null; //  new UimaASProcessStatusImpl(pt);
 			String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
 			if ( casReferenceId != null && casReferenceId.trim().length() > 0)
 			{
@@ -1039,17 +1034,6 @@
 			}
 			else if (AsynchAEMessage.Process == command)
 			{
-/*				
-				if (receiveWindow > 0)
-				{
-					synchronized (gater)
-					{
-						howManyBeforeReplySeen--;
-						gater.notifyAll();
-					}
-
-				}
-*/				
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINEST", new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
 				
 				String casReferenceId = 
@@ -1110,7 +1094,6 @@
 							synchronized( threadMonitor.getMonitor() )
 							{
 								threadMonitor.setWasSignaled();
-								cachedRequest.setReceivedProcessCasReply();
 								threadMonitor.getMonitor().notifyAll();
 							}
 						}
@@ -1128,16 +1111,6 @@
 			e.printStackTrace();
 		}
 	}
-/*
-	private void sendRequestToReleaseCas( String aCasReferenceId, Endpoint anEndpoint ) throws Exception
-	{
-			MessageProducer msgProducer =
-				lookupProducerForEndpoint( anEndpoint );
-			TextMessage tm = producerSession.createTextMessage("");
-			setReleaseCASMessage(tm, aCasReferenceId);
-			msgProducer.send(tm);
-	}
-*/	
 	/**
 	 * Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
 	 */
@@ -1155,7 +1128,6 @@
 		{
 			throw new ResourceProcessException( new Exception("Uima EE Client Not In Running State"));
 		}
-		String casReferenceId = null;
 			// keep handle to CAS, we'll deserialize into this same CAS later
 			sendAndReceiveCAS = aCAS;
 			
@@ -1174,7 +1146,7 @@
 			ClientRequest cachedRequest = produceNewClientRequestObject();
 			cachedRequest.setSynchronousInvocation();
 			// send CAS. This call does not block. Instead we will block the sending thread below.
-			casReferenceId = sendCAS(aCAS, cachedRequest);
+			sendCAS(aCAS, cachedRequest);
 			if ( threadMonitor != null && threadMonitor.getMonitor() != null)
 			{
 				//	Block here
@@ -1263,48 +1235,46 @@
 		  }
 		  //  Store the total latency for this CAS. The departure time is set right before the CAS
 		  //  is sent to a service.
-      //TODO set to process timeout value in nanos
 		  cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
 
-      // mark timeout exception
-      cachedRequest.setTimeoutException();
+		  // mark timeout exception
+		  cachedRequest.setTimeoutException();
 
-      if ( cachedRequest.isSynchronousInvocation() )
-      {
-        //  Signal a thread that we received a reply, if in the map
-        if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
-        {
-          ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
-          //  Unblock the sending thread so that it can complete processing with an error
-          synchronized( threadMonitor.getMonitor() )
-          {
-            threadMonitor.setWasSignaled();
-            cachedRequest.setReceivedProcessCasReply(); // should not be needed
-            threadMonitor.getMonitor().notifyAll();
-          }
-        }
-      }
-      else {
-        // notify the application listener with the error
-        exc = new UimaASProcessCasTimeout();
-        status.addEventStatus("Process", "Failed", exc);
-        notifyListeners(aCAS, status, AsynchAEMessage.Process);
-      }
-      cachedRequest.removeEntry(casReferenceId);
-
-      synchronized (gater) {
-        if (howManyBeforeReplySeen > 0) {
-          howManyBeforeReplySeen--;
-        }
-        //TODO what is being notified???
-        gater.notifyAll();
-      }
-      howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
-			break;
-		}
-    
+		  if ( cachedRequest.isSynchronousInvocation() )
+		  {
+			  //  Signal a thread that we received a reply, if in the map
+			  if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+			  {
+				  ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+				  //  	Unblock the sending thread so that it can complete processing with an error
+				  synchronized( threadMonitor.getMonitor() )
+				  {
+					  threadMonitor.setWasSignaled();
+					  threadMonitor.getMonitor().notifyAll();
+				  }
+			  }
+		  }
+		  else 
+		  {
+			  // notify the application listener with the error
+			  exc = new UimaASProcessCasTimeout();
+			  status.addEventStatus("Process", "Failed", exc);
+			  notifyListeners(aCAS, status, AsynchAEMessage.Process);
+		  }
+		  cachedRequest.removeEntry(casReferenceId);
 
-	}
+		  synchronized (gater) 
+		  {
+			  if (howManyBeforeReplySeen > 0) 
+			  {
+				  howManyBeforeReplySeen--;
+			  }
+			  gater.notifyAll();
+			  howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
+		  }
+		  break;
+		}  // case
+ 	}
 
 	public class ClientRequest
 	{
@@ -1334,7 +1304,7 @@
 
 		private String endpoint;
 
-		private boolean receivedProcessCasReply = false;
+//		private boolean receivedProcessCasReply = false;
 		
 		private long threadId=-1;
 		
@@ -1437,10 +1407,6 @@
 		{
 			return threadId;
 		}
-		public void setReceivedProcessCasReply()
-		{
-			receivedProcessCasReply = true;
-		}
 		public void setMetadataTimeout( int aTimeout )
 		{
 			metadataTimeout = aTimeout;

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java?rev=662564&r1=662563&r2=662564&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/jms/error/handler/JMSExceptionHandler.java Mon Jun  2 14:07:17 2008
@@ -81,7 +81,6 @@
 		System.out.println("Handling JMS Connect Exception Due To::"+exception.getLocalizedMessage());
 		System.out.println("Exception Cause::"+exception.getClass().getName()+":::Message::"+exception.getLocalizedMessage());
 		
-//		String casReferenceId = (String)anErrorContext.get(AsynchAEMessage.CasReferenceId);
 		String casReferenceId = (String)anErrorContext.get(AsynchAEMessage.CasReference);
 		Endpoint endpoint = (Endpoint)anErrorContext.get(AsynchAEMessage.Endpoint);
 		
@@ -94,11 +93,14 @@
 			try
 			{
 				entry = aController.getInProcessCache().getCacheEntryForCAS(casReferenceId);
+				if ( endpoint.isRemote() && entry != null )
+				{
+					aController.dropCAS(casReferenceId, true );
+				}
 			}
-			catch( AsynchAEException e) {}
-			if ( endpoint.isRemote() && entry != null )
+			catch( AsynchAEException e) 
 			{
-				aController.dropCAS(casReferenceId, true );
+				System.out.println("Cas:"+casReferenceId+" Not Found In the Cache.");
 			}
 		}
 	}