You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [11/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uim...
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/Listener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+public interface Listener extends LifecycleListener {
+ public enum Type {
+ GetMeta, ProcessCAS, FreeCAS, Reply, Target, Unknown
+ }
+
+ public Transport getTransport();
+
+ public Type getType();
+
+ public Object getEndpoint();
+
+ // for delegate reply listener this should be delegate key
+ public String getName();
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/CasPool.java Mon Feb 26 18:54:11 2018
@@ -51,4 +51,18 @@ public interface CasPool {
* @param heapSize the new initial fs heap size
*/
public void setInitialFsHeapSize(int heapSize);
+
+ /**
+ * Returns boolean to indicate if JCasCache should be enabled/disabled
+ *
+ * @return - boolean
+ */
+ public boolean disableJCasCache();
+
+ /**
+ * Disables or enables JCasCache.
+ *
+ * @param disable - true or false
+ */
+ public void disableJCasCache(boolean disable);
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/DelegateConfiguration.java Mon Feb 26 18:54:11 2018
@@ -150,4 +150,17 @@ public interface DelegateConfiguration {
*/
public CollectionProcessCompleteErrorHandlingSettings getCollectionProcessCompleteErrorHandlingSettings();
+ /**
+ * Returns the value of the diableJCasCache
+ * @return true if configured to disable JCasCache. False otherwise
+ */
+ public boolean disableJCasCache();
+
+ /**
+ * Sets the value of the diableJCasCache
+ *
+ * @param disable - true to disable JCasCache. False otherwise
+ */
+ public void disableJCasCache(boolean disable);
+
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/ServiceContext.java Mon Feb 26 18:54:11 2018
@@ -289,5 +289,14 @@ public interface ServiceContext {
* @return the cp c additional action
*/
public Action getCpCAdditionalAction();
-
+ /**
+ * Enables/disables JCas Cache
+ *
+ */
+ public void disableJCasCache(boolean disable);
+ /**
+ * Gets a boolean to indicate if JCas Cache is enabled/disabled
+ *
+ */
+ public boolean disableJCasCache();
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasMultiplierImpl.java Mon Feb 26 18:54:11 2018
@@ -38,7 +38,7 @@ public class CasMultiplierImpl implement
* @param cmt the cmt
*/
protected CasMultiplierImpl(CasMultiplierType cmt ) {
- this(cmt,1,2000000,false);
+ this(cmt,1,2000000,false,false);
}
/**
@@ -49,11 +49,12 @@ public class CasMultiplierImpl implement
* @param initialHeapSize the initial heap size
* @param processParentLast the process parent last
*/
- protected CasMultiplierImpl(CasMultiplierType cmt, int casPoolSize, int initialHeapSize, boolean processParentLast) {
+ protected CasMultiplierImpl(CasMultiplierType cmt, int casPoolSize, int initialHeapSize, boolean processParentLast, boolean disableJCasCache) {
this.cmt = cmt;
setCasPoolSize(casPoolSize);
setInitialFsHeapSize(initialHeapSize);
setProcessParentLast(processParentLast);
+ disableJCasCache(disableJCasCache);
}
/**
@@ -67,6 +68,7 @@ public class CasMultiplierImpl implement
setCasPoolSize(context.getCasPoolSize());
setInitialFsHeapSize(context.getInitialHeapSize());
setProcessParentLast(context.processParentLast());
+ disableJCasCache(context.disableJCasCache());
// if ( props.containsKey(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT)) {
// setAttr(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT,props);
@@ -151,4 +153,12 @@ public int getCasPoolSize() {
Assert.notNull(cmt);
cmt.setProcessParentLast(Boolean.toString(processParentLast));
}
+ public boolean disableJCasCache() {
+ Assert.notNull(cmt);
+ return cmt.getDisableJCasCache();
+ }
+ public void disableJCasCache(boolean disable ) {
+ Assert.notNull(cmt);
+ cmt.setDisableJCasCache(disable);
+ }
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/CasPoolImpl.java Mon Feb 26 18:54:11 2018
@@ -45,6 +45,7 @@ public class CasPoolImpl implements CasP
this.cpt = cpt;
setNumberOfCases(context.getCasPoolSize());
setInitialFsHeapSize(context.getInitialHeapSize());
+ disableJCasCache(context.disableJCasCache());
// if ( props.containsKey(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT)) {
// setAttr(UimaASDeploymentDescriptor.CASPOOL_CAS_COUNT,props);
@@ -99,5 +100,18 @@ public int getNumberOfCases() {
Assert.notNull(cpt);
cpt.setInitialFsHeapSize(heapSize);
}
-
+ /* (non-Javadoc)
+ * @see org.apache.uima.resourceSpecifier.factory.CasPool#disableJCasCache()
+ */
+ public boolean disableJCasCache() {
+ Assert.notNull(cpt);
+ return cpt.getDisableJCasCache();
+ }
+ /* (non-Javadoc)
+ * @see org.apache.uima.resourceSpecifier.factory.CasPool#disableJCasCache(boolean)
+ */
+ public void disableJCasCache(boolean disable) {
+ Assert.notNull(cpt);
+ cpt.setDisableJCasCache(disable);
+ }
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ColocatedDelegateEngineImpl.java Mon Feb 26 18:54:11 2018
@@ -52,7 +52,7 @@ public class ColocatedDelegateEngineImpl
setKey(cdc.getKey());
setReplyQueueScaleup(cdc.getInternalReplyQueueScaleout());
if ( cdc.isCasMultiplier()) {
- cm = new CasMultiplierImpl(dcaet.addNewCasMultiplier(), cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast());
+ cm = new CasMultiplierImpl(dcaet.addNewCasMultiplier(), cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast(),cdc.disableJCasCache());
}
configuration= cdc;
}
@@ -111,7 +111,11 @@ public class ColocatedDelegateEngineImpl
* @see org.apache.uima.resourceSpecifier.factory.ColocatedDelegateEngine#setAsync()
*/
public void setAsync() {
- dcaet.setAsync("true");
+ if ( isAggregate()) {
+ dcaet.setAsync("true");
+ } else {
+ dcaet.setAsync("false");
+ }
}
/* (non-Javadoc)
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/RemoteDelegateEngineImpl.java Mon Feb 26 18:54:11 2018
@@ -113,7 +113,7 @@ public class RemoteDelegateEngineImpl im
* @param context the context
*/
private void addCasMultiplier(CasMultiplierType cmt, DelegateConfiguration cdc, ServiceContext context) {
- cm = new CasMultiplierImpl(cmt, cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast());
+ cm = new CasMultiplierImpl(cmt, cdc.getCasPoolSize(), cdc.getInitialHeapSize(), cdc.processParentLast(), cdc.disableJCasCache());
}
/**
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/resourceSpecifier/factory/impl/ServiceContextImpl.java Mon Feb 26 18:54:11 2018
@@ -84,6 +84,10 @@ public class ServiceContextImpl implemen
/** The cpc additional action. */
private Action cpcAdditionalAction = Action.Terminate;
+
+ /** The disable/enable of JCasCache */
+ private boolean disableJCasCache=true;
+
/**
* This class describes UIMA AS service. The minimal information needed is the service name, description, the AnalysisEngine
* descriptor and the queue name. If not provided, the class uses defaults to fully describe the service.
@@ -373,5 +377,16 @@ public class ServiceContextImpl implemen
public boolean isAsync() {
return async;
}
-
+ /* (non-Javadoc)
+ * @see org.apache.uima.resourceSpecifier.factory.ServiceContext#disableJCasCache(boolean)
+ */
+ public void disableJCasCache(boolean disable) {
+ disableJCasCache = disable;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.uima.resourceSpecifier.factory.ServiceContext#disableJCasCache()
+ */
+ public boolean disableJCasCache() {
+ return disableJCasCache;
+ }
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Mon Feb 26 18:54:11 2018
@@ -40,6 +40,7 @@ import org.apache.uima.adapter.jms.JmsCo
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.jms.error.handler.BrokerConnectionException;
@@ -54,11 +55,11 @@ import org.apache.uima.util.impl.Process
* worker thread terminates when the uima ee client calls doStop() method.
*/
public abstract class BaseMessageSender implements Runnable, MessageSender {
- private static final Class CLASS_NAME = BaseMessageSender.class;
+ private static final Class<?> CLASS_NAME = BaseMessageSender.class;
// A reference to a shared queue where application threads enqueue messages
// to be sent
- protected BlockingQueue<PendingMessage> messageQueue = new LinkedBlockingQueue<PendingMessage>();
+ protected BlockingQueue<PendingMessage> messageQueue = new LinkedBlockingQueue<>();
// Global flag controlling lifecycle of this thread. It will be set to true
// when the
@@ -116,7 +117,7 @@ public abstract class BaseMessageSender
done = true;
// Create an empty message to deliver to the queue that is blocking
- PendingMessage emptyMessage = new PendingMessage(0);
+ PendingMessage emptyMessage = new PendingMessageImpl(0);
messageQueue.add(emptyMessage);
synchronized(emptyMessage) {
@@ -191,7 +192,7 @@ public abstract class BaseMessageSender
if (pm.getMessageType() == AsynchAEMessage.Process) {
// fetch the cache entry for this CAS
ClientRequest cacheEntry = (ClientRequest) engine.getCache().get(
- pm.get(AsynchAEMessage.CasReference));
+ pm.getPropertyAsString(AsynchAEMessage.CasReference));
if ( cacheEntry != null ) {
// We are rejecting any Process requests until connection to broker
// is recovered
@@ -505,12 +506,12 @@ public abstract class BaseMessageSender
break;
case AsynchAEMessage.Process:
- String casReferenceId = (String) aPm.get(AsynchAEMessage.CasReference);
+ String casReferenceId = aPm.getPropertyAsString(AsynchAEMessage.CasReference);
if (engine.getSerialFormat() == SerialFormat.XMI) {
- String serializedCAS = (String) aPm.get(AsynchAEMessage.CAS);
+ String serializedCAS = aPm.getPropertyAsString(AsynchAEMessage.CAS);
engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
} else {
- byte[] serializedCAS = (byte[]) aPm.get(AsynchAEMessage.CAS);
+ byte[] serializedCAS = aPm.getPropertyAsBytesArray(AsynchAEMessage.CAS);
engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
}
@@ -523,15 +524,15 @@ public abstract class BaseMessageSender
break;
case AsynchAEMessage.ReleaseCAS:
- String casRefId = (String) aPm.get(AsynchAEMessage.CasReference);
+ String casRefId = aPm.getPropertyAsString(AsynchAEMessage.CasReference);
String selector =
- (String) aPm.get(AsynchAEMessage.TargetingSelector) ;
+ aPm.getPropertyAsString(AsynchAEMessage.TargetingSelector) ;
engine.setFreeCasMessage(anOutgoingMessage, casRefId, selector);
break;
case AsynchAEMessage.Stop:
- String casRefId2 = (String) aPm.get(AsynchAEMessage.CasReference);
+ String casRefId2 = aPm.getPropertyAsString(AsynchAEMessage.CasReference);
engine.setStopMessage(anOutgoingMessage, casRefId2);
break;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Feb 26 18:54:11 2018
@@ -83,10 +83,15 @@ import org.apache.uima.aae.error.UimaEES
import org.apache.uima.aae.jmx.UimaASClientInfo;
import org.apache.uima.aae.jmx.UimaASClientInfoMBean;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UimaASClientDirectMessage;
+import org.apache.uima.aae.message.UimaASClientMessage;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.adapter.jms.ConnectionValidator;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.adapter.jms.message.PendingMessageImpl;
+import org.apache.uima.as.client.DirectMessage;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.AllowPreexistingFS;
@@ -173,8 +178,13 @@ public abstract class BaseUIMAAsynchrono
protected AtomicLong totalCasRequestsSentBetweenCpCs = new AtomicLong();
- protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
+// protected ConcurrentHashMap springContainerRegistry = new ConcurrentHashMap();
+ protected Map<String, UimaASService> serviceRegistry =
+ new ConcurrentHashMap<>();
+
+ abstract protected UimaASService getServiceReference();
+
protected MessageConsumer consumer = null;
protected SerialFormat serialFormat = SerialFormat.XMI;
@@ -199,7 +209,7 @@ public abstract class BaseUIMAAsynchrono
private Object casProducerMux = new Object();
- protected BlockingQueue<PendingMessage> pendingMessageQueue = new LinkedBlockingQueue<PendingMessage>();
+ protected BlockingQueue<PendingMessage> pendingMessageQueue = new LinkedBlockingQueue<>();
// Create Semaphore that will signal when the producer object is initialized
protected Semaphore producerSemaphore = new Semaphore(1);
@@ -250,7 +260,7 @@ public abstract class BaseUIMAAsynchrono
abstract protected void setStopMessage(Message msg, String aCasReferenceId) throws Exception;
- abstract protected void setCPCMessage(Message msg) throws Exception;
+ abstract public void setCPCMessage(Message msg) throws Exception;
abstract public void initialize(Map anApplicationContext) throws ResourceInitializationException;
@@ -270,12 +280,28 @@ public abstract class BaseUIMAAsynchrono
abstract protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception;
+ abstract protected void beforeProcessReply(String casReferenceId);
+
+ abstract protected boolean isServiceRemote();
+
// enables/disable timer per CAS. Defaul is to use single timer for
// all outstanding CASes
- protected volatile boolean timerPerCAS=false;
+ public volatile boolean timerPerCAS=false;
//abstract protected String getBrokerURI();
+
+ private long replyCount=1;
+
+ protected Transport transport;
+ public boolean isRunning() {
+ return running;
+ }
+
+ public ClientServiceDelegate getServiceDelegate() {
+ return serviceDelegate;
+ }
+
protected void setBrokeryURI(String brokerURI ) {
this.brokerURI = brokerURI;
}
@@ -381,6 +407,8 @@ public abstract class BaseUIMAAsynchrono
}
private void addMessage(PendingMessage msg) {
+ System.out.println("Client addMessage() - adding message to pendingMessageQueue");
+
pendingMessageQueue.add(msg);
}
@@ -446,10 +474,10 @@ public abstract class BaseUIMAAsynchrono
clientCache.put(uniqueIdentifier, requestToCache);
- PendingMessage msg = new PendingMessage(AsynchAEMessage.CollectionProcessComplete);
+ PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.CollectionProcessComplete);
if (cpcTimeout > 0) {
requestToCache.startTimer();
- msg.put(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
+ msg.addProperty(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -492,9 +520,16 @@ public abstract class BaseUIMAAsynchrono
Iterator it = clientCache.keySet().iterator();
while (it.hasNext()) {
ClientRequest entry = clientCache.get((String) it.next());
- if (entry != null && entry.getCAS() != null) {
- entry.getCAS().release();
+ System.out.println(".......... Client.releaseCacheEntries() - Client.hashCode()-"+this.hashCode()+" has "+clientCache.size()+ " entries remaining");
+ if ( entry.isThisClientTheCasOwner(this) ) {
+ if (entry != null && entry.getCAS() != null) {
+ System.out.println("............. Client is the owner of CAS:"+entry.getCasReferenceId()+" - CAS release is permitted");
+ entry.getCAS().release();
+ }
}
+// if (entry != null && entry.getCAS() != null) {
+// entry.getCAS().release();
+// }
}
}
@@ -569,13 +604,13 @@ public abstract class BaseUIMAAsynchrono
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO",
new Object[] {});
}
- for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();) {
+ for (Iterator i = serviceRegistry.entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Map.Entry) i.next();
Object key = entry.getKey();
undeploy((String) key);
}
asynchManager = null;
- springContainerRegistry.clear();
+ serviceRegistry.clear();
listeners.clear();
clientCache.clear();
threadQueue.clear();
@@ -773,8 +808,9 @@ public abstract class BaseUIMAAsynchrono
}
protected void sendMetaRequest() throws Exception {
- PendingMessage msg = new PendingMessage(AsynchAEMessage.GetMeta);
- ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this); // , metadataTimeout);
+ PendingMessage msg = new PendingMessageImpl(AsynchAEMessage.GetMeta);
+ ClientRequest requestToCache = new ClientRequest(uniqueIdentifier, this);
+
requestToCache.setIsRemote(remoteService);
requestToCache.setMetaRequest(true);
requestToCache.setMetadataTimeout(metadataTimeout);
@@ -849,7 +885,7 @@ public abstract class BaseUIMAAsynchrono
}
// If the CR is done, enter a polling loop waiting for outstanding CASes to return
// from a service
- if (hasNext == false ) {
+ if (!hasNext) {
Object mObject = new Object();
// if client is running and there are outstanding CASe go sleep for awhile and
// try again, until all CASes come back from a service
@@ -870,7 +906,38 @@ public abstract class BaseUIMAAsynchrono
protected ConcurrentHashMap<String, ClientRequest> getCache() {
return clientCache;
}
-
+ private void prepareRemoteRequest(PendingMessage msg, ClientRequest requestToCache) throws Exception {
+ long t1 = System.nanoTime();
+ switch (serialFormat) {
+ case XMI:
+ XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
+ String serializedCAS = serializeCAS(requestToCache.getCAS(), serSharedData);
+ msg.addProperty(AsynchAEMessage.CAS,serializedCAS);
+ if (remoteService) { // always true 5/2013
+ // Store the serialized CAS in case the timeout occurs and need to send the
+ // the offending CAS to listeners for reporting
+ requestToCache.setCAS(serializedCAS);
+ requestToCache.setXmiSerializationSharedData(serSharedData);
+ }
+ break;
+ case BINARY:
+ byte[] serializedBinaryCAS = uimaSerializer.serializeCasToBinary(requestToCache.getCAS());
+ msg.addProperty(AsynchAEMessage.CAS,serializedBinaryCAS);
+ break;
+ case COMPRESSED_FILTERED:
+ // can't use uimaserializer directly - project doesn't have ref to this one
+ // for storing the reuse info
+ BinaryCasSerDes6 bcs = new BinaryCasSerDes6(requestToCache.getCAS(), this.getRemoteTypeSystem());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ bcs.serialize(baos);
+ requestToCache.setCompress6ReuseInfo(bcs.getReuseInfo());
+ msg.addProperty(AsynchAEMessage.CAS,baos.toByteArray());
+ break;
+ default:
+ throw new UIMARuntimeException(new Exception("Internal Error - Unsupported Serialization Format:"+serialFormat));
+ }
+ requestToCache.setSerializationTime(System.nanoTime() - t1);
+ }
/**
* Sends a given CAS for analysis to the UIMA EE Service.
*
@@ -895,104 +962,37 @@ public abstract class BaseUIMAAsynchrono
}
return null;
}
-
- clientCache.put(casReferenceId, requestToCache);
- PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
- long t1 = System.nanoTime();
- switch (serialFormat) {
- case XMI:
- XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
- String serializedCAS = serializeCAS(aCAS, serSharedData);
- msg.put(AsynchAEMessage.CAS, serializedCAS);
- if (remoteService) { // always true 5/2013
- // Store the serialized CAS in case the timeout occurs and need to send the
- // the offending CAS to listeners for reporting
- requestToCache.setCAS(serializedCAS);
- requestToCache.setXmiSerializationSharedData(serSharedData);
- }
- break;
- case BINARY:
- byte[] serializedBinaryCAS = uimaSerializer.serializeCasToBinary(aCAS);
- msg.put(AsynchAEMessage.CAS, serializedBinaryCAS);
- break;
- case COMPRESSED_FILTERED:
- // can't use uimaserializer directly - project doesn't have ref to this one
- // for storing the reuse info
- BinaryCasSerDes6 bcs = new BinaryCasSerDes6(aCAS, this.getRemoteTypeSystem());
- ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
- bcs.serialize(baos);
- requestToCache.setCompress6ReuseInfo(bcs.getReuseInfo());
- msg.put(AsynchAEMessage.CAS, baos.toByteArray());
- break;
- default:
- throw new UIMARuntimeException(new Exception("Internal Error"));
+ // Save the CAS in the cache entry
+ requestToCache.setCAS(aCAS);
+ // add current work item to the client cache
+ if ( !clientCache.containsKey(casReferenceId)) {
+ clientCache.put(requestToCache.getCasReferenceId(), requestToCache);
}
+
+ // pending message is an object which will be added to the work queue for dispatching
+ PendingMessage msg =
+ new PendingMessageImpl(AsynchAEMessage.Process);
+
+ msg.addProperty(AsynchAEMessage.CasReference, requestToCache.getCasReferenceId());
- requestToCache.setCAS(aCAS);
-
- requestToCache.setSerializationTime(System.nanoTime() - t1);
- msg.put(AsynchAEMessage.CasReference, casReferenceId);
- requestToCache.setIsRemote(remoteService);
+ if ( isServiceRemote() ) {
+ prepareRemoteRequest(msg, requestToCache);
+ requestToCache.setIsRemote(true);
+ } else {
+ msg.addProperty(AsynchAEMessage.CAS, aCAS);
+ }
requestToCache.setEndpoint(getEndPointName());
requestToCache.setProcessTimeout(processTimeout);
requestToCache.clearTimeoutException();
// The sendCAS() method is synchronized no need to synchronize the code below
if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
- SharedConnection sharedConnection = lookupConnection(getBrokerURI());
-
- // Send Ping to service as getMeta request
- if ( sharedConnection != null && !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
- serviceDelegate.setAwaitingPingReply();
- // Add the cas to a list of CASes pending reply. Also start the timer if necessary
- // serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
-
- // since the service is in time out state, we dont send CASes to it just yet. Instead, place
- // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
- // arrives.
- serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS);
- if ( cpcReadySemaphore.availablePermits() > 0 ) {
- acquireCpcReadySemaphore();
- }
-
- // Send PING Request to check delegate's availability
- sendMetaRequest();
- // @@@@@@@@@@@@@@@ Changed on 4/20 serviceDelegate.cancelDelegateTimer();
- // Start a timer for GetMeta ping and associate a cas id
- // with this timer. The delegate is currently in a timed out
- // state due to a timeout on a CAS with a given casReferenceId.
- //
- serviceDelegate.startGetMetaRequestTimer(casReferenceId);
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
- new Object[] { serviceDelegate.getKey() });
+ if ( remoteService ) {
+ // handleDelegateTimeout(requestToCache, aCAS);
+ return requestToCache.getCasReferenceId();
}
- return casReferenceId;
- } else {
- if ( !requestToCache.isSynchronousInvocation() && !sharedConnection.isOpen() ) {
- Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection To Broker "+getBrokerURI()+" Has Been Lost");
- handleException(exception, requestToCache.getCasReferenceId(), null, requestToCache, true);
- return casReferenceId;
- } else {
- // Add to the outstanding list.
- // serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
- // since the service is in time out state, we dont send CASes to it just yet. Instead, place
- // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
- // arrives.
- serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS);
- return casReferenceId;
- }
- }
}
- SharedConnection sharedConnection = lookupConnection(getBrokerURI());
- if ( sharedConnection != null && !sharedConnection.isOpen() ) {
- if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) {
- aCAS.release();
- }
- throw new ResourceProcessException(new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost"));
- }
+
// Incremented number of outstanding CASes sent to a service. When a reply comes
// this counter is decremented
outstandingCasRequests.incrementAndGet();
@@ -1001,14 +1001,22 @@ public abstract class BaseUIMAAsynchrono
totalCasRequestsSentBetweenCpCs.incrementAndGet();
// Add message to the pending queue
addMessage(msg);
+
} catch (ResourceProcessException e) {
- clientCache.remove(casReferenceId);
+ if ( clientCache.containsKey(requestToCache.getCasReferenceId())) {
+ clientCache.remove(requestToCache.getCasReferenceId());
+ }
throw e;
} catch (Exception e) {
- clientCache.remove(casReferenceId);
+ if ( clientCache.containsKey(requestToCache.getCasReferenceId())) {
+ clientCache.remove(requestToCache.getCasReferenceId());
+ }
+
+ //clientCache.remove(casReferenceId);
throw new ResourceProcessException(e);
}
- return casReferenceId;
+ return requestToCache.getCasReferenceId();
+
}
}
@@ -1029,6 +1037,8 @@ public abstract class BaseUIMAAsynchrono
private ClientRequest produceNewClientRequestObject() {
String casReferenceId = idGenerator.nextId();
+
+ System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>> CLIENT ABOUT TO SEND CAS:"+casReferenceId);
return new ClientRequest(casReferenceId, this);
}
@@ -1056,6 +1066,8 @@ public abstract class BaseUIMAAsynchrono
*/
protected void handleCollectionProcessCompleteReply(Message message) throws Exception {
int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ System.out.println("In handleCollectionProcessCompleteReply() ...");
+
try {
if (AsynchAEMessage.Exception == payload) {
ProcessTrace pt = new ProcessTrace_impl();
@@ -1087,7 +1099,228 @@ public abstract class BaseUIMAAsynchrono
cpcReplySemaphore.release();
}
}
+
+ protected void handleCollectionProcessCompleteReply(UimaASClientMessage message) throws Exception {
+ int payload = message.payload();
+ System.out.println("In handleCollectionProcessCompleteReply() ...");
+ try {
+ if (AsynchAEMessage.Exception == payload) {
+ ProcessTrace pt = new ProcessTrace_impl();
+ UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ Exception exception = retrieveExceptionFromMessage(message);
+
+ status.addEventStatus("CpC", "Failed", exception);
+ notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleCollectionProcessCompleteReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_exception_msg_INFO",
+ new Object[] { message.messageFrom(),
+ getBrokerURI(),
+ message.casReferenceId(), exception });
+ }
+ } else {
+ // After receiving CPC reply there may be cleanup to do. Delegate this
+ // to platform specific implementation (ActiveMQ or WAS)
+ cleanup();
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ // Release the semaphore acquired in collectionProcessingComplete()
+ cpcReplySemaphore.release();
+ }
+ }
+ private Exception retrieveExceptionFromMessage(UimaASClientMessage message) throws Exception {
+ Exception exception = null;
+ try {
+ if ( message.payload() == AsynchAEMessage.BinaryPayload ) {
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ // Deserialize as Exception TBD !!!!!!!!!!!!!!!!!!1
+ exception = (Exception) ((ObjectMessage) message).getObject();
+
+ } else {
+ exception = new UimaEEServiceException(((TextMessage) message).getText());
+ }
+/*
+ if (message instanceof ObjectMessage
+ && ((ObjectMessage) message).getObject() instanceof Exception) {
+ exception = (Exception) ((ObjectMessage) message).getObject();
+ } else if (message instanceof TextMessage) {
+ exception = new UimaEEServiceException(((TextMessage) message).getText());
+ }
+ */
+ } catch( Exception e) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "retrieveExceptionFromMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ exception = new UimaEEServiceException("UIMA AS client is unable to de-serialize Exception from a remote service",e);
+ }
+ return exception;
+ }
+ private void handleProcessReplyFromSynchronousCall(ClientRequest cachedRequest, UimaASClientMessage message)
+ throws Exception {
+ // Save reply message in the cache
+ cachedRequest.setMessage(message);
+ wakeUpSendThread(cachedRequest);
+ }
+
+ /**
+ * Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
+ *
+ * @param message
+ * - jms message containing serialized ResourceMetaData
+ *
+ * @throws Exception
+ */
+
+ protected void handleMetadataReply(UimaASClientMessage message) throws Exception {
+ serviceDelegate.cancelDelegateGetMetaTimer();
+ serviceDelegate.setState(Delegate.OK_STATE);
+ // check if the reply msg contains replyTo destination. It will be
+ // added by the Cas Multiplier to the getMeta reply
+
+ /*
+ * !!!!!!!!!!!!!!!!!!!!!!! ENABLE CODE BELOW IN JMS SPECIFIC IMPL
+ !!!!!!!!!!!!!!!!!!!!!!!!! MOVE THIS CODE TO JMS SPECIFIC CODE
+ if (message.replyTo() != null) {
+ serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
+ }
+ */
+
+ // Check if this is a reply for a Ping sent in response to a timeout
+ if (serviceDelegate.isAwaitingPingReply()) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleMetadataReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_rcvd_ping_reply__INFO",
+ new Object[] {
+ message.messageFrom(),
+ message.serverIP()});
+ }
+ // reset the state of the service. The client received its ping reply
+ serviceDelegate.resetAwaitingPingReply();
+ String casReferenceId = null;
+ if (serviceDelegate.getCasPendingReplyListSize() > 0 || serviceDelegate.getCasPendingDispatchListSize() > 0) {
+ serviceDelegate.restartTimerForOldestCasInOutstandingList();
+ // We got a reply to GetMeta ping. Send all CASes that have been
+ // placed on the pending dispatch queue to a service.
+ while( serviceDelegate.getState()==Delegate.OK_STATE && (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
+ ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ if (cachedRequest != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleMetadataReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_dispatch_delayed_cas__INFO",
+ new Object[] { casReferenceId, String.valueOf(cachedRequest.cas.hashCode())});
+ }
+ sendCAS(cachedRequest.getCAS(), cachedRequest, null);
+ }
+ }
+ } else {
+ ProcessTrace pt = new ProcessTrace_impl();
+ UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ notifyListeners(null, status, AsynchAEMessage.GetMeta);
+ }
+ // Handled Ping reply
+ return;
+ }
+ int payload = message.payload();
+ removeFromCache(uniqueIdentifier);
+
+ try {
+ if (AsynchAEMessage.Exception == payload) {
+ ProcessTrace pt = new ProcessTrace_impl();
+ UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
+ Exception exception = retrieveExceptionFromMessage(message);
+ clientSideJmxStats.incrementMetaErrorCount();
+ status.addEventStatus("GetMeta", "Failed", exception);
+ notifyListeners(null, status, AsynchAEMessage.GetMeta);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleMetadataReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_exception_msg_INFO",
+ new Object[] { message.messageFrom(),
+ getBrokerURI(),
+ message.casReferenceId(), exception });
+ }
+ abort = true;
+ initialized = false;
+ } else {
+ // Check serialization supported by the service against client configuration.
+ // If the client is configured to use Binary serialization *but* the service
+ // doesnt support it, change the client serialization to xmi. Old services will
+ // not return in a reply the type of serialization supported which implies "xmi".
+ // New services *always* return "binary" or "compressedBinaryXXX"
+ // as a default serialization. The client
+ // however may still want to serialize messages using xmi though.
+ if (!message.serializationSpecified()) {
+ // Dealing with an old service here, check if there is a mismatch with the
+ // client configuration. If the client is configured with binary serialization
+ // override this and change serialization to "xmi".
+ if (getSerialFormat() != SerialFormat.XMI) {
+ // Override configured serialization
+ setSerialFormat(SerialFormat.XMI);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] {});
+ }
+ } else {
+ //final int c = message.serialization();
+ if (getSerialFormat() != SerialFormat.XMI) {
+ // don't override if XMI - because the remote may have different type system
+ setSerialFormat((message.serializationMethod() == AsynchAEMessage.XMI_SERIALIZATION) ? SerialFormat.XMI :
+ (message.serializationMethod() == AsynchAEMessage.BINARY_SERIALIZATION) ? SerialFormat.BINARY :
+ SerialFormat.COMPRESSED_FILTERED);
+ }
+ }
+ /*
+ String meta = ((TextMessage) message).getText();
+ System.out.println(meta);
+ ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes());
+ XMLInputSource in1 = new XMLInputSource(bis, null);
+ // Adam - store ResouceMetaData in field so we can return it from getMetaData().
+ resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
+ .parseResourceMetaData(in1);
+ */
+ resourceMetadata = (ProcessingResourceMetaData) message.asObject(AsynchAEMessage.AEMetadata);
+ // if remote delegate, save type system
+ if (brokerURI != null && !brokerURI.startsWith("vm:")) { // test if remote
+ setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
+ }
+ casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_handling_meta_reply_FINEST",
+ new Object[] { message.messageFrom(), resourceMetadata });
+ }
+ // check the state of the client
+ if ( running && asynchManager != null ) {
+ // Merge the metadata only if the client is still running
+ asynchManager.addMetadata(resourceMetadata);
+ }
+ }
+
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ getMetaSemaphore.release();
+ }
+ }
/**
* Handles response to GetMeta Request. Deserializes ResourceMetaData and initializes CasManager.
*
@@ -1208,7 +1441,7 @@ public abstract class BaseUIMAAsynchrono
resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
.parseResourceMetaData(in1);
// if remote delegate, save type system
- if (!brokerURI.startsWith("vm:")) { // test if remote
+ if (brokerURI != null && !brokerURI.startsWith("vm:")) { // test if remote
setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
}
casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
@@ -1411,7 +1644,150 @@ public abstract class BaseUIMAAsynchrono
}
}
-
+ /**
+ * Handles response to Process CAS request. If the message originated in a service that is running
+ * in a separate jvm (remote), deserialize the CAS and notify the application of the completed
+ * analysis via application listener.
+ *
+ * @param message
+ * - jms message containing serialized CAS
+ *
+ * @throws Exception
+ */
+ protected void handleProcessReply(UimaASClientMessage message, boolean doNotify, ProcessTrace pt)
+ throws Exception {
+ if (!running) {
+ return;
+ }
+ int payload = -1;
+ String casReferenceId = message.asString(AsynchAEMessage.CasReference);
+
+ // beforeProcessReply(casReferenceId);
+
+ // Determine the type of payload in the message (XMI,Cas Reference,Exception,etc)
+ if (message.contains(AsynchAEMessage.Payload)) {
+ payload = message.asInt(AsynchAEMessage.Payload);
+ }
+ // Fetch entry from the client cache for a cas id returned from the service
+ // The client cache maintains an entry for every outstanding CAS sent to the
+ // service.
+ ClientRequest cachedRequest = null;
+
+ if (casReferenceId != null) {
+
+ cachedRequest = clientCache.get(casReferenceId);
+ if (cachedRequest != null && casReferenceId.equals(cachedRequest.getCasReferenceId())) {
+ // invoke 2nd callback to the application if necessary. Makes sure the callback is done once
+ notifyApplication(cachedRequest,
+ pt,
+ casReferenceId,
+ message.asString(AsynchAEMessage.InputCasReference),
+ message.asString(AsynchAEMessage.ServerIP),
+ message.asString(AsynchAEMessage.UimaASProcessPID));
+
+ // Received a reply, decrement number of outstanding CASes. An outstanding CAS is one
+ // for which we have not received a reply.
+ decrementOutstandingCasCounter();
+ }
+
+ // turn off Process timer and remove CAS from outstanding (waiting for reply) list
+ serviceDelegate.removeCasFromOutstandingList(casReferenceId);
+
+
+ /*
+ // Below is for debugging phase only. REMOVE BEFORE COMMIT!!!!!!!!!!!!!
+ CAS cas = cachedRequest.getCAS();
+ FSIterator<AnnotationFS> it =
+ cas.getAnnotationIndex().iterator();
+ while( it.hasNext()) {
+ AnnotationFS afs = it.next();
+ System.out.println("..... Feature Covered Text:"+afs.getCoveredText());
+ }
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ */
+ }
+ if (AsynchAEMessage.Exception == payload) {
+ Object exception = message.asObject(AsynchAEMessage.ErrorCause);
+ if ( exception != null && exception instanceof Exception ) {
+ String parentCasId = message.asString(AsynchAEMessage.InputCasReference);
+ handleException((Exception)exception, parentCasId, cachedRequest, true);
+ }
+ return;
+ }
+ // cachedRequest is only null if we are receiving child CASes from a
+ // Cas Multiplier. Otherwise, we drop the message as it is out of band
+ if ( cachedRequest == null && !casMultiplierDelegate ) {
+ // most likely a reply came in after the thread was interrupted
+ return;
+ }
+
+
+ // If the Cas Reference id not in the message check if the message contains an
+ // exception and if so, handle the exception and return.
+ if (casReferenceId == null) {
+ return;
+ }
+
+ if (message instanceof TextMessage
+ && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "handleProcessReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_handling_process_reply_FINEST",
+ new Object[] { message.asString(AsynchAEMessage.MessageFrom),
+ message.asString(AsynchAEMessage.CasReference),
+ message.toString() + ((TextMessage) message).getText() });
+ }
+
+ if (cachedRequest != null) {
+ beforeProcessReply(casReferenceId);
+
+ // Store the total latency for this CAS. The departure time is set right before the CAS
+ // is sent to a service.
+ cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cas_reply_rcvd_FINE", new Object[] { casReferenceId, String.valueOf(cachedRequest.getCAS().hashCode())});
+ }
+
+ // If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
+ // sent the CAS and process the reply
+ if (cachedRequest.isSynchronousInvocation()) {
+ handleProcessReplyFromSynchronousCall(cachedRequest, message);
+ System.out.println("handleProcessReply() - handling process reply for sync call");
+ } else {
+// deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, doNotify);
+ System.out.println("handleProcessReply() - handling process reply for async call #"+replyCount++);
+ completeProcessingReply( doNotify, message, pt);
+
+ }
+ } else if (message.contains(AsynchAEMessage.InputCasReference)) {
+ int command = message.asInt(AsynchAEMessage.Command);
+ if (AsynchAEMessage.ServiceInfo != command) {
+ System.out.println("handleProcessReply() - handling process reply for cas multiplier");
+
+ handleProcessReplyFromCasMultiplier(message, casReferenceId, payload);
+
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ // Most likely expired message. Already handled as timeout. Discard the message and move on
+ // to the next
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleProcessReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_expired_msg_INFO",
+ new Object[] { message.asString(AsynchAEMessage.MessageFrom),
+ message.asString(AsynchAEMessage.CasReference) });
+ }
+ }
+
+ }
/**
* Handles response to Process CAS request. If the message originated in a service that is running
* in a separate jvm (remote), deserialize the CAS and notify the application of the completed
@@ -1424,6 +1800,8 @@ public abstract class BaseUIMAAsynchrono
*/
protected void handleProcessReply(Message message, boolean doNotify, ProcessTrace pt)
throws Exception {
+ System.out.println(".............. Client.handleProcessReply() 1");
+
if (!running) {
return;
}
@@ -1522,6 +1900,79 @@ public abstract class BaseUIMAAsynchrono
}
}
}
+ private void handleProcessReplyFromCasMultiplier(UimaASClientMessage message, String casReferenceId,
+ int payload ) throws Exception {
+ // Check if the message contains a CAS that was generated by a Cas Multiplier. If so,
+ // verify that the message also includes an input CAS id and that such input CAS id
+ // exists in the client's cache.
+ // Fetch the input CAS Reference Id from which the CAS being processed was generated from
+ String inputCasReferenceId = message.asString(AsynchAEMessage.InputCasReference);
+ // Fetch the destination for Free CAS notification
+ @SuppressWarnings("unchecked")
+ BlockingQueue<DirectMessage> releaseCASQueue =
+ ( BlockingQueue<DirectMessage>) message.asObject(AsynchAEMessage.FreeCASQueue);
+ // send request to a remote CAS multiplier to release a CAS
+ getServiceReference().releaseCAS(casReferenceId, releaseCASQueue);
+
+ System.out.println("............ Client Sent ReleaseCAS Request for CAS:"+casReferenceId);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "handleProcessReplyFromCasMultiplier",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_sending_release_cas_FINEST",
+ new Object[] { releaseCASQueue,
+ message.asString(AsynchAEMessage.CasReference) });
+ }
+
+ // Fetch an entry from the client cache for a given input CAS id. This would be an id
+ // of the CAS that the client sent out to the service.
+
+ ClientRequest inputCasCachedRequest = (ClientRequest) clientCache.get(inputCasReferenceId);
+ if (inputCasCachedRequest == null) {
+ System.out.println(".... Client Sent FREE CAS request for CAS:"+casReferenceId+" Input CAS:"+inputCasReferenceId+" Entry Not Found in Client Cache");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ // Most likely expired message. Already handled as timeout. Discard the message and move on
+ // to the next
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleProcessReplyFromCasMultiplier",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_expired_msg_INFO",
+ new Object[] { message.asString(AsynchAEMessage.MessageFrom),
+ message.asString(AsynchAEMessage.InputCasReference) });
+ }
+ return;
+ }
+ /*
+ *
+ * UNCOMMENT THIS
+ if (inputCasCachedRequest.isSynchronousInvocation()) {
+ // with synchronous invocation, child CASes are thrown away. With sync API, the UIMA-AS client
+ // is not using callbacks.
+ if ( casReferenceId.equals(inputCasCachedRequest.getCasReferenceId())) {
+ handleProcessReplyFromSynchronousCall(inputCasCachedRequest, message);
+ } else {
+ return;
+ }
+ }
+ CAS cas = null;
+ if (message instanceof TextMessage) {
+ cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL);
+ } else {
+ long bodyLength = ((BytesMessage) message).getBodyLength();
+ byte[] serializedCas = new byte[(int) bodyLength];
+ ((BytesMessage) message).readBytes(serializedCas);
+ cas = deserializeCAS(serializedCas, SHADOW_CAS_POOL);
+
+ }
+
+ completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest,
+ null);
+ */
+ }
private void handleProcessReplyFromCasMultiplier(Message message, String casReferenceId,
int payload /* , ClientRequest inputCasCachedRequest */) throws Exception {
@@ -1530,6 +1981,9 @@ public abstract class BaseUIMAAsynchrono
// exists in the client's cache.
// Fetch the input CAS Reference Id from which the CAS being processed was generated from
String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
+ // Fetch the destination for Free CAS notification
+ // Destination freeCASNotificationDestination = message.getJMSReplyTo();
+
try {
String nodeIP = message.getStringProperty(AsynchAEMessage.ServerIP);
String pid = message.getStringProperty(AsynchAEMessage.UimaASProcessPID);
@@ -1732,14 +2186,21 @@ public abstract class BaseUIMAAsynchrono
decrementOutstandingCasCounter();
}
}
-
+ protected void handleException(Exception exception,String inputCasReferenceId, ClientRequest cachedRequest, boolean doNotify)
+ throws Exception {
+
+ String casReferenceId = cachedRequest.getCasReferenceId();
+ handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
+ }
protected void handleException(Message message, ClientRequest cachedRequest, boolean doNotify)
throws Exception {
Exception exception = retrieveExceptionFromMessage(message);
- String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
+// String casReferenceId = message.getStringProperty(AsynchAEMessage.CasReference);
String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
- handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
+// handleException(exception, casReferenceId, inputCasReferenceId, cachedRequest, doNotify);
+ handleException(exception, inputCasReferenceId, cachedRequest, true);
+
}
private void dumpMetrics(List<AnalysisEnginePerformanceMetrics> ml, String casReferenceId ) {
@@ -1769,6 +2230,7 @@ public abstract class BaseUIMAAsynchrono
}
return ml;
}
+ /*
private void completeProcessingReply(CAS cas, String casReferenceId, int payload,
boolean doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt)
throws Exception {
@@ -1837,7 +2299,186 @@ public abstract class BaseUIMAAsynchrono
}
}
}
+*/
+ private void completeProcessingReply( boolean doNotify, UimaASClientMessage message, ProcessTrace pt)
+ throws Exception {
+ ClientRequest cachedRequest =
+ (ClientRequest) clientCache.get(message.asString(AsynchAEMessage.CasReference));
+ int payload = message.asInt(AsynchAEMessage.Payload);
+
+ if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload
+ || AsynchAEMessage.CASRefID == payload) {
+ if (pt == null) {
+ pt = new ProcessTrace_impl();
+ }
+ try {
+ // Log stats and populate ProcessTrace object
+
+ // UNCOMMENT BELOW. NEED THIS. 08/24/2016 !!!!!!!!!!!1
+ // logTimingInfo(message, pt, cachedRequest);
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ if (doNotify) {
+ UimaASProcessStatusImpl status;
+ String inputCasReferenceId = message.asString(AsynchAEMessage.InputCasReference);
+ if (inputCasReferenceId != null
+ && inputCasReferenceId.equals(cachedRequest.getCasReferenceId())) {
+ status = new UimaASProcessStatusImpl(pt, cachedRequest.getCAS(), message.asString(AsynchAEMessage.CasReference), inputCasReferenceId);
+ } else {
+ status = new UimaASProcessStatusImpl(pt, cachedRequest.getCAS(), message.asString(AsynchAEMessage.CasReference));
+ }
+ if ( message.contains(AsynchAEMessage.CASPerComponentMetrics)) {
+ // Add CAS identifier to enable matching replies with requests
+ // notifyListeners(cachedRequest.getCAS(), status, AsynchAEMessage.Process, message.asString(AsynchAEMessage.CASPerComponentMetrics));
+ } else {
+ // Add CAS identifier to enable matching replies with requests
+ notifyListeners(cachedRequest.getCAS(), status, AsynchAEMessage.Process);
+ }
+ } else { // synchronous sendAndReceive() was used
+ if (message.asString(AsynchAEMessage.CasReference) != null && message.contains(AsynchAEMessage.CASPerComponentMetrics) ) {
+ //cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
+ cachedRequest.getComponentMetricsList().
+ addAll(UimaSerializer.deserializePerformanceMetrics(message.asString(AsynchAEMessage.CASPerComponentMetrics)));
+ }
+ }
+ }
+ } finally {
+ // Dont release the CAS if the application uses synchronous API
+// if (remoteService && !cachedRequest.isSynchronousInvocation()) {
+ if ( !cachedRequest.isSynchronousInvocation()) {
+ if (cachedRequest.getCAS() != null) {
+ System.out.println("... Releasing CAS "+cachedRequest.getCasReferenceId());
+ cachedRequest.getCAS().release();
+ }
+ }
+ System.out.println("1 /////////////////////////////////// Client.hashCode()-"+this.hashCode()+" calling removeFromCache()- CAS:"+message.asString(AsynchAEMessage.CasReference));
+
+ removeFromCache(message.asString(AsynchAEMessage.CasReference));
+ }
+ }
+ }
+ private void completeProcessingReply(CAS cas, String casReferenceId, int payload,
+ boolean doNotify,Message message, ClientRequest cachedRequest, ProcessTrace pt)
+ throws Exception {
+ if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload
+ || AsynchAEMessage.CASRefID == payload) {
+ if (pt == null) {
+ pt = new ProcessTrace_impl();
+ }
+ try {
+ // Log stats and populate ProcessTrace object
+ logTimingInfo(message, pt, cachedRequest);
+ if (doNotify) {
+ UimaASProcessStatusImpl status;
+ String inputCasReferenceId = message.getStringProperty(AsynchAEMessage.InputCasReference);
+ if (inputCasReferenceId != null
+ && inputCasReferenceId.equals(cachedRequest.getCasReferenceId())) {
+ status = new UimaASProcessStatusImpl(pt, cas,casReferenceId, inputCasReferenceId);
+ } else {
+ status = new UimaASProcessStatusImpl(pt, cas, casReferenceId);
+ }
+ if ( message.propertyExists(AsynchAEMessage.CASPerComponentMetrics)) {
+ List<AnalysisEnginePerformanceMetrics> ml =
+ getMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics), casReferenceId, UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST));
+
+ // Add CAS identifier to enable matching replies with requests
+ notifyListeners(cas, status, AsynchAEMessage.Process, ml);//message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics));
+ } else {
+ // Add CAS identifier to enable matching replies with requests
+ notifyListeners(cas, status, AsynchAEMessage.Process);
+ }
+ } else { // synchronous sendAndReceive() was used
+ if (casReferenceId != null && message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+ cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
+ cachedRequest.getComponentMetricsList().
+ addAll(UimaSerializer.deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+ }
+ }
+ }
+ } finally {
+ // Dont release the CAS if the application uses synchronous API
+ if (remoteService && !cachedRequest.isSynchronousInvocation()) {
+ if (cas != null) {
+ cas.release();
+ }
+ }
+ System.out.println("2 /////////////////////////////////// Client.hashCode()-"+this.hashCode()+" calling removeFromCache()- CAS:"+casReferenceId);
+ removeFromCache(casReferenceId);
+ }
+ }
+ }
+ private boolean isProcessRequest(PendingMessage pm) {
+ return pm.getMessageType() == AsynchAEMessage.Process;
+ }
+ private boolean isGetMetaRequest(PendingMessage pm) {
+ return pm.getMessageType() == AsynchAEMessage.GetMeta;
+ }
+
+ public void beforeDispatch(PendingMessage pm) {
+ if (isProcessRequest(pm)) {
+ ClientRequest cacheEntry =
+ (ClientRequest) getCache().get( pm.getPropertyAsString(AsynchAEMessage.CasReference));
+ cacheEntry.setCASDepartureTime(System.nanoTime());
+ // Add the cas to a list of CASes pending reply. Also start the timer if necessary
+ getServiceDelegate().
+ addCasToOutstandingList(cacheEntry.getCasReferenceId(), cacheEntry.getCAS().hashCode(), timerPerCAS); // true=timer per cas
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_cas_added_to_pending_FINE", new Object[]
+ { pm.getPropertyAsString(AsynchAEMessage.CasReference),
+ String.valueOf(cacheEntry.getCAS().hashCode()),
+ getServiceDelegate().toString()});
+ }
+
+ } else if (isGetMetaRequest(pm) && getServiceDelegate().getGetMetaTimeout() > 0) {
+
+ getServiceDelegate().startGetMetaRequestTimer();
+ }
+ }
+ public void onMessage(DirectMessage message) {
+ if ( !this.running) {
+ return; // ignore the message if the client is stopping
+ }
+
+ onMessage(new UimaASClientDirectMessage(message));
+ }
+ protected void onMessage( UimaASClientMessage message ) {
+ // process service replies
+ try {
+
+ /*
+ UimaASCommandHandler commandHandler = CommandHandlers.newCommandHandler(this, message.command());
+ commandHandler.handle(message);
+
+ */
+ switch(message.command()) {
+ case AsynchAEMessage.GetMeta: // received GetMeta reply from a service
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
+ new Object[] { message.messageFrom()});
+ }
+ System.out.println("... Processing GetMeta Reply");
+ handleMetadataReply(message);
+ break;
+ case AsynchAEMessage.Process: // received Process reply from a service
+ System.out.println("onMessage() - recv'd Process reply");
+ handleProcessReply(message, true, null);
+ break;
+
+ case AsynchAEMessage.CollectionProcessComplete: // received Process reply from a service
+ System.out.println("onMessage() - recv'd CPC reply");
+ handleCollectionProcessCompleteReply(message);
+ break;
+ }
+ } catch( Throwable t) {
+ t.printStackTrace();
+ }
+ }
private void logTimingInfo(Message message, ProcessTrace pt, ClientRequest cachedRequest)
throws Exception {
clientSideJmxStats.incrementTotalNumberOfCasesProcessed();
@@ -2069,7 +2710,28 @@ public abstract class BaseUIMAAsynchrono
uimaSerializer.deserializeCasFromBinary(aSerializedCAS, cas);
return cas;
}
+ private CAS deserializeCAS(ClientRequest cachedRequest) throws Exception {
+ CAS cas = null;
+
+ Message message = cachedRequest.getMessage();
+ if (message != null) {
+ boolean deltaCas = false;
+ if (message.propertyExists(AsynchAEMessage.SentDeltaCas)) {
+ deltaCas = message.getBooleanProperty(AsynchAEMessage.SentDeltaCas);
+ }
+
+ if (message instanceof TextMessage) {
+ cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest, deltaCas);
+ } else {
+ long bodyLength = ((BytesMessage) message).getBodyLength();
+ byte[] serializedCas = new byte[(int) bodyLength];
+ ((BytesMessage) message).readBytes(serializedCas);
+ cas = deserializeCAS(serializedCas, cachedRequest);
+ }
+ }
+ return cas;
+ }
/**
* Listener method receiving JMS Messages from the response queue.
*
@@ -2078,6 +2740,19 @@ public abstract class BaseUIMAAsynchrono
if ( !this.running) {
return; // ignore the message if the client is stopping
}
+ /*
+
+ UimaMessage um = new JmsMessage(message);
+ MessageHandler mh = MessageHandlers.createHandler(this);
+ mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newProcessCASCommand());
+ mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newProcessChildCASCommand());
+ mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newGetMetaCommand());
+ mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newCPCCommand());
+ mh.addCommandHandler(CommandFactory.get(CommandFactory.JMS).newServiceInfoCommand());
+
+ mh.handle(um);
+
+ */
// Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
// This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock
// is only released if this method returns. Running in a dispatch thread caused a hang when an application
@@ -2107,6 +2782,8 @@ public abstract class BaseUIMAAsynchrono
}
int command = message.getIntProperty(AsynchAEMessage.Command);
+
+ System.out.println(" >>>>>>>>>> Received Reply for Command:"+command);
if (AsynchAEMessage.CollectionProcessComplete == command) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
@@ -2120,7 +2797,9 @@ public abstract class BaseUIMAAsynchrono
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
+ System.out.println(">>>>>> Handling GetMeta Reply");
handleMetadataReply(message);
+ System.out.println(">>>>>> Done Handling GetMeta Reply");
} else if (AsynchAEMessage.Process == command) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
@@ -2148,10 +2827,92 @@ public abstract class BaseUIMAAsynchrono
}
});
-
+
}
/**
+ * Listener method receiving JMS Messages from the response queue.
+ *
+ */
+ /*
+ public void onMessage(final Message message) {
+ if ( !this.running) {
+ return; // ignore the message if the client is stopping
+ }
+ // Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
+ // This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock
+ // is only released if this method returns. Running in a dispatch thread caused a hang when an application
+ // decided to call System.exit() in any of its callback listener methods. The UIMA AS client adds a
+ // ShutdownHoook to the JVM to enable orderly shutdown which includes stopping JMS Consumer, JMS Producer
+ // and finally stopping JMS Connection. The ShutdownHook support was added to the client in case the
+ // application doesnt call client's stop() method. Now, the hang was caused by the fact that the dispatch
+ // thread was used to call System.exit() which in turn executed client's ShutdownHook code. The ShutdownHook
+ // code runs in a separate thread, but the the JVM blocks the dispatch thread until the ShutdownHook
+ // finishes. It never will though, since the ShutdownHook is calling ActiveMQSession.close() which tries to enter
+ // the same critical region that the dispatch thread is still stuck into. DEADLOCK.
+ // The code below uses a simple FixedThreadPool Executor with a single thread. This thread is reused instead
+ // creating one on the fly.
+ exec.execute( new Runnable() {
+
+ public void run() {
+ try {
+
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ if (!message.propertyExists(AsynchAEMessage.Command)) {
+ return;
+ }
+
+ int command = message.getIntProperty(AsynchAEMessage.Command);
+ if (AsynchAEMessage.CollectionProcessComplete == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleCollectionProcessCompleteReply(message);
+ } else if (AsynchAEMessage.GetMeta == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleMetadataReply(message);
+ } else if (AsynchAEMessage.Process == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleProcessReply(message, true, null);
+ } else if (AsynchAEMessage.ServiceInfo == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_service_info_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleServiceInfo(message);
+ }
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+
+ }
+
+ }
+ });
+
+ }
+*/
+ /**
* Gets the ProcessingResourceMetadata for the asynchronous AnalysisEngine.
*/
public ProcessingResourceMetaData getMetaData() throws ResourceInitializationException {
@@ -2207,6 +2968,8 @@ public abstract class BaseUIMAAsynchrono
}
ClientRequest cachedRequest = produceNewClientRequestObject();
+ clientCache.put(cachedRequest.getCasReferenceId(), cachedRequest);
+
cachedRequest.setSynchronousInvocation();
// cachedRequest.setTargetServiceId(targetServiceId);
// save application provided List where the performance stats will be copied
@@ -2371,9 +3134,38 @@ public abstract class BaseUIMAAsynchrono
try {
// Process reply in the send thread
Message message = cachedRequest.getMessage();
- if (message != null) {
- deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+ if (message == null) {
+ throw new IllegalArgumentException("Client Recv'd Invalid Reply Message for CAS:"+casReferenceId+" The JMS Message is Missing ");
+ }
+
+
+ CAS cas = null;
+ int payload = AsynchAEMessage.None;
+ // Process reply in the send thread
+ if (transport.equals(Transport.JMS)) {
+ if (!running) {
+ return casReferenceId;
+ }
+
+ System.out.println("............ Client.hashCode():"+this.hashCode()+" Recv'd Reply For CAS:"+casReferenceId+" Deserializing");
+ long t1 = System.nanoTime();
+ cas = deserializeCAS(cachedRequest);
+ cachedRequest.setDeserializationTime(System.nanoTime() - t1);
+
+ payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
+ completeProcessingReply(cas, casReferenceId, payload, false, message, cachedRequest, pt);
+
+// deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+
+ } else {
+ cas = cachedRequest.getCAS();
+ completeProcessingReply(cas, casReferenceId, payload, false, message, cachedRequest, pt);
+
}
+
+// if (message != null) {
+// deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+// }
} catch (ResourceProcessException rpe) {
throw rpe;
} catch (Exception e) {
@@ -2620,6 +3412,8 @@ public abstract class BaseUIMAAsynchrono
List<AnalysisEnginePerformanceMetrics> componentMetricsList;
+ private UimaASClientMessage clientMessage;
+
// flag to indicate if the remote service acknowledged receiving a CAS for processing
private volatile boolean receivedServiceACK=false;
@@ -2632,7 +3426,16 @@ public abstract class BaseUIMAAsynchrono
public void setTargetServiceId(String serviceTargetId) {
this.targetServiceId = serviceTargetId;
}
-
+ public boolean isThisClientTheCasOwner(BaseUIMAAsynchronousEngineCommon_impl client) {
+ System.out.println("......isThisClientTheCasOwner() - client.hashCode():"+client.hashCode()+" uimaEEEngine.hashCode():"+uimaEEEngine.hashCode());
+ return client.equals(uimaEEEngine);
+ }
+ public UimaASClientMessage getClientMessage() {
+ return clientMessage;
+ }
+ public void setMessage(UimaASClientMessage message) {
+ this.clientMessage = message;
+ }
public boolean receivedServiceACK() {
return receivedServiceACK;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java Mon Feb 26 18:54:11 2018
@@ -33,7 +33,7 @@ import org.apache.uima.aae.message.UIMAM
import org.apache.uima.cas.SerialFormat;
public class JmsMessageContext implements MessageContext {
- private static final Class CLASS_NAME = JmsMessageContext.class;
+ private static final Class<?> CLASS_NAME = JmsMessageContext.class;
private Message message;
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/PendingMessage.java Mon Feb 26 18:54:11 2018
@@ -18,18 +18,12 @@
*/
package org.apache.uima.adapter.jms.message;
-import java.util.HashMap;
-
-public class PendingMessage extends HashMap<Object, Object> {
-
-private static final long serialVersionUID = 3512718154731557413L;
-private int messageType;
-
- public PendingMessage(int aMessageType) {
- messageType = aMessageType;
- }
-
- public int getMessageType() {
- return messageType;
- }
+public interface PendingMessage {
+
+ public int getMessageType();
+ public Object getProperty(String propertyKey);
+ public void addProperty(String propertyKey, Object property);
+ public String getPropertyAsString(String key);
+ public int getPropertyAsInt(String key);
+ public byte[] getPropertyAsBytesArray(String key);
}