You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/12 16:29:05 UTC
svn commit: r733786 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client:
BaseUIMAAsynchronousEngineCommon_impl.java ClientServiceDelegate.java
Author: cwiklik
Date: Mon Jan 12 07:28:58 2009
New Revision: 733786
URL: http://svn.apache.org/viewvc?rev=733786&view=rev
Log:
UIMA-1133 Modified client API to use one timer to time requests.
Added:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=733786&r1=733785&r2=733786&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Jan 12 07:28:58 2009
@@ -82,6 +82,7 @@
import org.apache.uima.util.impl.ProcessTrace_impl;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.delegate.Delegate;
public abstract class BaseUIMAAsynchronousEngineCommon_impl
implements UimaAsynchronousEngine, MessageListener
@@ -162,6 +163,8 @@
private UimaSerializer uimaSerializer = new UimaSerializer();
+ protected ClientServiceDelegate serviceDelegate = null;
+
protected List pendingMessageList = new ArrayList();
protected volatile boolean producerInitialized;
abstract public String getEndPointName() throws Exception;
@@ -513,32 +516,6 @@
throw new ResourceProcessException(e);
}
}
-/*
- protected void waitUntilReadyToSendMessage(int aCommand)
- {
- if (receiveWindow > 0)
- {
- if (howManyBeforeReplySeen > 0 && howManyBeforeReplySeen % receiveWindow == 0)
- {
- synchronized (gater)
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "waitUntilReadyToSendMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_blocking_on_semaphore_FINEST", new Object[] { "Gater" });
- try
- {
- // This monitor is dedicated to single purpose event.
- gater.wait();
- }
- catch (Exception e)
- {
- }
- }
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "waitUntilReadyToSendMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_done_blocking_on_semaphore_FINEST", new Object[] { "Gater" });
- }
- howManyBeforeReplySeen++;
- }
-
- }
-*/
protected ConcurrentHashMap getCache()
{
return clientCache;
@@ -596,9 +573,13 @@
if (processTimeout > 0)
{
- requestToCache.startTimer();
+ //requestToCache.startTimer();
+ // Adds CAS Id to the list of CAS pending reply. It starts timer
+ // if the list is empty
}
- synchronized( pendingMessageList )
+ serviceDelegate.addCasToOutstandingList(casReferenceId);
+
+ synchronized( pendingMessageList )
{
pendingMessageList.add(msg);
pendingMessageList.notifyAll();
@@ -848,6 +829,11 @@
{
return;
}
+ if ( serviceDelegate.getCasProcessTimeout() > 0) {
+ serviceDelegate.cancelDelegateTimer();
+ }
+
+ serviceDelegate.removeCasFromOutstandingList(casReferenceId);
if ( message instanceof TextMessage && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference), message.toString()+((TextMessage) message).getText() });
@@ -861,12 +847,11 @@
cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
// Cancel the timer
- try
- {
- cancelTimer(casReferenceId);
- }
- catch( Exception e) {}
-
+// try
+// {
+// cancelTimer(casReferenceId);
+// }
+// catch( Exception e) {}
// If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
// sent the CAS and process the reply
if ( cachedRequest.isSynchronousInvocation() )
@@ -1039,6 +1024,7 @@
cachedRequest.getCAS().release();
}
removeFromCache(casReferenceId);
+ serviceDelegate.removeCasFromOutstandingList(casReferenceId);
if (howManyRecvd == howManySent)
{
synchronized (cpcGate)
@@ -1225,14 +1211,14 @@
clientCache.remove(aCasReferenceId);
}
}
- private CAS deserialize(String aSerializedCAS, CAS aCAS ) throws Exception
+ protected CAS deserialize(String aSerializedCAS, CAS aCAS ) throws Exception
{
XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, -1);
return aCAS;
}
- private CAS deserialize(String aSerializedCAS, CAS aCAS, XmiSerializationSharedData deserSharedData, boolean deltaCas ) throws Exception
+ protected CAS deserialize(String aSerializedCAS, CAS aCAS, XmiSerializationSharedData deserSharedData, boolean deltaCas ) throws Exception
{
if (deltaCas) {
uimaSerializer.deserializeCasFromXmi(aSerializedCAS, aCAS, deserSharedData, true, deserSharedData.getMaxXmiId(), AllowPreexistingFS.allow);
@@ -1242,7 +1228,7 @@
return aCAS;
}
- private CAS deserialize(byte[] binaryData, ClientRequest cachedRequest) throws Exception
+ protected CAS deserialize(byte[] binaryData, ClientRequest cachedRequest) throws Exception
{
CAS cas = cachedRequest.getCAS();
uimaSerializer.deserializeCasFromBinary(binaryData, cas);
@@ -1523,7 +1509,7 @@
notifyListeners(aCAS, status, AsynchAEMessage.Process);
}
cachedRequest.removeEntry(casReferenceId);
-
+ serviceDelegate.removeCasFromOutstandingList(casReferenceId);
synchronized (gater)
{
if (howManyBeforeReplySeen > 0)
@@ -1532,6 +1518,13 @@
}
gater.notifyAll();
howManyRecvd++; // increment global counter to enable CPC request to be sent when howManySent = howManyRecvd
+ if (howManyRecvd == howManySent)
+ {
+ synchronized (cpcGate)
+ {
+ cpcGate.notifyAll();
+ }
+ }
}
break;
} // case
@@ -1713,7 +1706,9 @@
{
isRemote = aRemote;
}
-
+ public boolean isRemote() {
+ return isRemote;
+ }
public void setCAS(CAS aCAS)
{
cas = aCAS;
@@ -1733,7 +1728,15 @@
binaryCas = aBinaryCas;
isBinaryCas = true;
}
-
+ public boolean isBinaryCAS() {
+ return isBinaryCas;
+ }
+ public byte[] getBinaryCAS() {
+ return binaryCas;
+ }
+ public String getXmiCAS() {
+ return serializedCAS;
+ }
public void startTimer()
{
Date timeToRun = null;
Added: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=733786&view=auto
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java (added)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Mon Jan 12 07:28:58 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.adapter.jms.client;
+
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.error.MessageTimeoutException;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
+import org.apache.uima.cas.CAS;
+
+public class ClientServiceDelegate extends Delegate {
+
+ private BaseUIMAAsynchronousEngineCommon_impl clientUimaAsEngine;
+ private String applicationName = "UimaAsClient";
+ public ClientServiceDelegate(String serviceName, String anApplicationName, BaseUIMAAsynchronousEngineCommon_impl engine ) {
+ super.delegateKey = serviceName;
+ clientUimaAsEngine = engine;
+ if ( anApplicationName != null && anApplicationName.trim().length() > 0 ) {
+ applicationName = anApplicationName;
+ }
+ }
+ public String getComponentName() {
+ return applicationName;
+ }
+
+ public void handleError(Exception e, ErrorContext errorContext) {
+ String casReferenceId = (String)errorContext.get(AsynchAEMessage.CasReference);
+ ClientRequest cachedRequest = (ClientRequest)clientUimaAsEngine.clientCache.get(casReferenceId);
+ CAS cas = null;
+ try {
+ if (cachedRequest.isRemote()) {
+ cas = cachedRequest.getCAS();
+ }
+ if ( e instanceof MessageTimeoutException) {
+
+ // Notifies Listeners and removes ClientRequest instance from the client cache
+ clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(), BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout, casReferenceId);
+ clientUimaAsEngine.clientSideJmxStats.incrementProcessTimeoutErrorCount();
+ }
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ // Dont release the CAS if synchronous API was used
+ if (cas != null && !cachedRequest.isSynchronousInvocation())
+ {
+ cas.release();
+ }
+ }
+
+}