You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/12 16:29:05 UTC

svn commit: r733786 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client: BaseUIMAAsynchronousEngineCommon_impl.java ClientServiceDelegate.java

Author: cwiklik
Date: Mon Jan 12 07:28:58 2009
New Revision: 733786

URL: http://svn.apache.org/viewvc?rev=733786&view=rev
Log:
UIMA-1133 Modified client API to use one timer to time requests.

Added:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=733786&r1=733785&r2=733786&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Jan 12 07:28:58 2009
@@ -82,6 +82,7 @@
 import org.apache.uima.util.impl.ProcessTrace_impl;
 import org.apache.uima.aae.client.UimaASProcessStatus;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.delegate.Delegate;
 
 public abstract class BaseUIMAAsynchronousEngineCommon_impl 
 implements UimaAsynchronousEngine, MessageListener
@@ -162,6 +163,8 @@
 	
   private UimaSerializer uimaSerializer = new UimaSerializer();
 
+  protected ClientServiceDelegate serviceDelegate = null;
+  
 	protected List pendingMessageList = new ArrayList();
 	protected volatile boolean producerInitialized;
 	abstract public String getEndPointName() throws Exception;
@@ -513,32 +516,6 @@
 			throw new ResourceProcessException(e);
 		}
 	}
-/*
-	protected void waitUntilReadyToSendMessage(int aCommand)
-	{
-		if (receiveWindow > 0)
-		{
-			if (howManyBeforeReplySeen > 0 && howManyBeforeReplySeen % receiveWindow == 0)
-			{
-				synchronized (gater)
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "waitUntilReadyToSendMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_blocking_on_semaphore_FINEST", new Object[] { "Gater" });
-					try
-					{
-						// This monitor is dedicated to single purpose event.
-						gater.wait();
-					}
-					catch (Exception e)
-					{
-					}
-				}
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "waitUntilReadyToSendMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_done_blocking_on_semaphore_FINEST", new Object[] { "Gater" });
-			}
-			howManyBeforeReplySeen++;
-		}
-
-	}
-*/	
 	protected ConcurrentHashMap getCache()
 	{
 		return clientCache;
@@ -596,9 +573,13 @@
 
 			if (processTimeout > 0)
 			{
-				requestToCache.startTimer();
+				//requestToCache.startTimer();
+				//  Adds CAS Id to the list of CAS pending reply. It starts timer
+			  //  if the list is empty
 			}
-			synchronized( pendingMessageList )
+      serviceDelegate.addCasToOutstandingList(casReferenceId);
+
+      synchronized( pendingMessageList )
 			{
 				pendingMessageList.add(msg);
 				pendingMessageList.notifyAll();
@@ -848,6 +829,11 @@
 		{
 			return;
 		}
+    if ( serviceDelegate.getCasProcessTimeout() > 0) {
+      serviceDelegate.cancelDelegateTimer();
+    }
+		
+		serviceDelegate.removeCasFromOutstandingList(casReferenceId);
 		if ( message instanceof TextMessage && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
 	    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
 	            new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), message.toString()+((TextMessage) message).getText() });
@@ -861,12 +847,11 @@
 			cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
 			
 			//	Cancel the timer
-			try
-			{
-				cancelTimer(casReferenceId);
-			}
-			catch( Exception e) {}
-
+//			try
+//			{
+//				cancelTimer(casReferenceId);
+//			}
+//			catch( Exception e) {}
 			//	If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
 			//	sent the CAS and process the reply
 			if ( cachedRequest.isSynchronousInvocation() )
@@ -1039,6 +1024,7 @@
          cachedRequest.getCAS().release();
       }
       removeFromCache(casReferenceId);
+      serviceDelegate.removeCasFromOutstandingList(casReferenceId);
       if (howManyRecvd == howManySent)
       {
         synchronized (cpcGate)
@@ -1225,14 +1211,14 @@
 			clientCache.remove(aCasReferenceId);
 		}
 	}
-	private CAS deserialize(String aSerializedCAS, CAS aCAS ) throws Exception
+	protected CAS deserialize(String aSerializedCAS, CAS aCAS ) throws Exception
 	{
 		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
     uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, -1);
 		return aCAS;
 	}
 	
-	private CAS deserialize(String aSerializedCAS, CAS aCAS, XmiSerializationSharedData deserSharedData, boolean deltaCas ) throws Exception
+	protected CAS deserialize(String aSerializedCAS, CAS aCAS, XmiSerializationSharedData deserSharedData, boolean deltaCas ) throws Exception
 	{
 		if (deltaCas) {
       uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, deserSharedData.getMaxXmiId(), AllowPreexistingFS.allow);
@@ -1242,7 +1228,7 @@
 		return aCAS;
 	}
 
-  private CAS deserialize(byte[] binaryData, ClientRequest cachedRequest) throws Exception
+  protected CAS deserialize(byte[] binaryData, ClientRequest cachedRequest) throws Exception
   {
     CAS cas = cachedRequest.getCAS();
     uimaSerializer.deserializeCasFromBinary(binaryData, cas);
@@ -1523,7 +1509,7 @@
 			  notifyListeners(aCAS, status, AsynchAEMessage.Process);
 		  }
 		  cachedRequest.removeEntry(casReferenceId);
-
+		  serviceDelegate.removeCasFromOutstandingList(casReferenceId);
 		  synchronized (gater) 
 		  {
 			  if (howManyBeforeReplySeen > 0) 
@@ -1532,6 +1518,13 @@
 			  }
 			  gater.notifyAll();
 			  howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
+	      if (howManyRecvd == howManySent)
+	      {
+	        synchronized (cpcGate)
+	        {
+	          cpcGate.notifyAll();
+	        }
+	      }
 		  }
 		  break;
 		}  // case
@@ -1713,7 +1706,9 @@
 		{
 			isRemote = aRemote;
 		}
-
+		public boolean isRemote() {
+		  return isRemote;
+		}
 		public void setCAS(CAS aCAS)
 		{
 			cas = aCAS;
@@ -1733,7 +1728,15 @@
 		  binaryCas = aBinaryCas;
 		  isBinaryCas = true;
 		}
-
+		public boolean isBinaryCAS() {
+		  return isBinaryCas;
+		}
+		public byte[] getBinaryCAS() {
+		  return binaryCas;
+		}
+		public String getXmiCAS() {
+		  return serializedCAS;
+		}
 		public void startTimer()
 		{
 			Date timeToRun = null; 

Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=733786&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Mon Jan 12 07:28:58 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.client;
+
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.error.MessageTimeoutException;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
+import org.apache.uima.cas.CAS;
+
+public class ClientServiceDelegate extends Delegate {
+
+  private BaseUIMAAsynchronousEngineCommon_impl clientUimaAsEngine;
+  private String applicationName = "UimaAsClient";
+  public ClientServiceDelegate(String serviceName, String anApplicationName,  BaseUIMAAsynchronousEngineCommon_impl engine ) {
+    super.delegateKey = serviceName;
+    clientUimaAsEngine = engine;
+    if ( anApplicationName != null && anApplicationName.trim().length() > 0 ) {
+      applicationName = anApplicationName;
+    }
+  }
+  public String getComponentName() {
+    return applicationName;
+  }
+
+  public void handleError(Exception e, ErrorContext errorContext) {
+    String casReferenceId = (String)errorContext.get(AsynchAEMessage.CasReference);
+    ClientRequest cachedRequest = (ClientRequest)clientUimaAsEngine.clientCache.get(casReferenceId);
+    CAS cas = null;
+    try {
+       if (cachedRequest.isRemote()) {
+         cas = cachedRequest.getCAS();
+       }
+       if ( e instanceof MessageTimeoutException) {
+         
+         //  Notifies Listeners and removes ClientRequest instance from the client cache
+         clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(), BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout, casReferenceId);
+         clientUimaAsEngine.clientSideJmxStats.incrementProcessTimeoutErrorCount();
+       }
+    }
+    catch (Exception ex) {
+        ex.printStackTrace();
+    }
+    //  Dont release the CAS if synchronous API was used
+    if (cas != null && !cachedRequest.isSynchronousInvocation())
+    {
+      cas.release();
+    }
+  }
+
+}