You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [6/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Feb 26 18:54:11 2018
@@ -19,7 +19,6 @@
package org.apache.uima.aae.controller;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
@@ -52,11 +51,11 @@ import org.apache.uima.aae.EECasManager_
import org.apache.uima.aae.InProcessCache;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UIDGenerator;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
-import org.apache.uima.aae.UimaAsContext;
import org.apache.uima.aae.UimaAsVersion;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.UimaEEAdminContext;
@@ -69,7 +68,6 @@ import org.apache.uima.aae.error.ErrorCo
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.ErrorHandlerChain;
import org.apache.uima.aae.error.ForcedMessageTimeoutException;
-import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaAsUncaughtExceptionHandler;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
import org.apache.uima.aae.jmx.JmxManagement;
@@ -83,15 +81,16 @@ import org.apache.uima.aae.monitor.Monit
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.aae.monitor.statistics.Statistic;
import org.apache.uima.aae.monitor.statistics.Statistics;
-import org.apache.uima.aae.spi.transport.UimaMessage;
import org.apache.uima.aae.spi.transport.UimaMessageListener;
import org.apache.uima.aae.spi.transport.UimaTransport;
-import org.apache.uima.aae.spi.transport.vm.VmTransport;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.analysis_engine.metadata.SofaMapping;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.Listener;
+import org.apache.uima.as.client.Listener.Type;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.CollectionReaderDescription;
import org.apache.uima.resource.PearSpecifier;
@@ -104,10 +103,16 @@ import org.apache.uima.util.Level;
public abstract class BaseAnalysisEngineController extends Resource_ImplBase implements
AnalysisEngineController, EventSubscriber {
- private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
+ public static enum ENDPOINT_TYPE {
+ JMS,
+ DIRECT
+ };
+ private static final Class<?> CLASS_NAME = BaseAnalysisEngineController.class;
private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
- public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
+ public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
public static final boolean NO_RECOVERY = true;
+ public static final ENDPOINT_TYPE DEFAULT_OUTPUTCHANNEL_TYPE = ENDPOINT_TYPE.JMS;
+
// Semaphore use only when quiesceAndStop is called
// When the cache becomes empty the semaphore is released.
private Semaphore quiesceSemaphore = new Semaphore(0);
@@ -148,9 +153,10 @@ public abstract class BaseAnalysisEngine
protected long errorCount = 0;
- protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>();
+// protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>();
- protected ConcurrentHashMap<String, InputChannel> inputChannelMap = new ConcurrentHashMap<String, InputChannel>();
+ protected ConcurrentHashMap<ENDPOINT_TYPE, InputChannel> inputChannelMap =
+ new ConcurrentHashMap<>();
private UimaEEAdminContext adminContext;
@@ -219,6 +225,9 @@ public abstract class BaseAnalysisEngine
private Object lock = new Object();
+ protected Map<String, OutputChannel> outputChannelMap =
+ new HashMap<String, OutputChannel>();
+
// Local cache for this controller only. This cache stores state of
// each CAS. The actual CAS is still stored in the global cache. The
// local cache is used to determine when each CAS can be removed as
@@ -260,6 +269,8 @@ public abstract class BaseAnalysisEngine
private String serviceName=null;
+ private String serviceId="";
+
protected UimaContext uimaContext=null;
public abstract void dumpState(StringBuffer buffer, String lbl1);
@@ -297,7 +308,10 @@ public abstract class BaseAnalysisEngine
int aComponentCasPoolSize, long anInitialCasHeapSize, String anEndpointName,
String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache,
Map aDestinationMap, JmxManagement aJmxManagement,boolean disableJCasCache) throws Exception {
- casManager = aCasManager;
+
+ System.out.println("C'tor Called Descriptor:"+aDescriptor);
+
+ casManager = aCasManager;
inProcessCache = anInProcessCache;
localCache = new LocalCache(this);
aeDescriptor = aDescriptor;
@@ -456,11 +470,25 @@ public abstract class BaseAnalysisEngine
}
// Register InProcessCache with JMX under the top level component
- if (inProcessCache != null && isTopLevelComponent()) {
- inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
- + inProcessCache.getName());
- ObjectName on = new ObjectName(inProcessCache.getName());
- jmxManagement.registerMBean(inProcessCache, on);
+// if (inProcessCache != null && isTopLevelComponent()) {
+// inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
+// + inProcessCache.getName());
+// ObjectName on = new ObjectName(inProcessCache.getName());
+// jmxManagement.registerMBean(inProcessCache, on);
+// }
+ if (inProcessCache != null && !inProcessCache.isRegisteredWithJMX() && isTopLevelComponent()) {
+
+ if ( jmxManagement.isRegistered(new ObjectName("org.apache.uima:type=ee.jms.services,s=Test Aggregate TAE Uima EE Service,p0=Test Aggregate TAE Components,name=InProcessCache"))) {
+ System.out.println("!!!!!!!!!!!!!!!!!! InProcessCache Already Registered");
+ } else {
+ System.out.println(">>>>>>>>> Registering InProcessCache with JMX");
+ inProcessCache.setName(jmxManagement.getJmxDomain() + jmxContext + ",name="
+ + inProcessCache.getName());
+ System.out.println("-------->>>>>>>>> InProcessCache Object Name:"+inProcessCache.getName());
+ ObjectName on = new ObjectName(inProcessCache.getName());
+ jmxManagement.registerMBean(inProcessCache, on);
+ inProcessCache.setRegisteredWithJMX();
+ }
}
initializeServiceStats();
@@ -582,7 +610,7 @@ public abstract class BaseAnalysisEngine
public AnalysisEngineController getParentController() {
return parentController;
}
-
+/*
public UimaTransport getTransport(String aKey) throws Exception {
return getTransport(null, aKey);
}
@@ -609,10 +637,11 @@ public abstract class BaseAnalysisEngine
return transport;
}
-
+*/
/**
* Initializes transport used for internal messaging between collocated Uima AS services.
*/
+ /*
public void initializeVMTransport(int parentControllerReplyConsumerCount) throws Exception {
// If this controller is an Aggregate Controller, force delegates to initialize
// their internal transports.
@@ -686,7 +715,7 @@ public abstract class BaseAnalysisEngine
}
}
-
+*/
public synchronized UimaMessageListener getUimaMessageListener(String aDelegateKey) {
return messageListeners.get(aDelegateKey);
}
@@ -818,6 +847,10 @@ public abstract class BaseAnalysisEngine
public void addUimaObject(String objectName ) throws Exception {
jmxManagement.addObject(objectName);
}
+ public void addOutputChannel(OutputChannel outputChannel) throws Exception {
+ outputChannelMap.put( outputChannel.getType().name(), outputChannel);
+// this.outputChannel = outputChannel;
+ }
/**
* Register a component with a given name with JMX MBeanServer
*
@@ -867,6 +900,7 @@ public abstract class BaseAnalysisEngine
registerWithAgent(servicePerformance, name);
servicePerformance.setIdleTime(System.nanoTime());
+ /*
ServiceInfo serviceInfo = null;
if (remote) {
serviceInfo = getInputChannel().getServiceInfo();
@@ -886,6 +920,7 @@ public abstract class BaseAnalysisEngine
}
}
}
+ */
ServiceInfo pServiceInfo = null;
if (this instanceof PrimitiveAnalysisEngineController) {
@@ -906,8 +941,12 @@ public abstract class BaseAnalysisEngine
}
+// name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
+// + serviceInfo.getLabel();
+
name = jmxManagement.getJmxDomain() + key_value_list + ",name=" + thisComponentName + "_"
- + serviceInfo.getLabel();
+ + pServiceInfo.getLabel();
+
if (!isTopLevelComponent()) {
pServiceInfo.setBrokerURL("Embedded Broker");
} else {
@@ -1092,7 +1131,7 @@ public abstract class BaseAnalysisEngine
public void setInputChannel(InputChannel anInputChannel) throws Exception {
inputChannel = anInputChannel;
- inputChannelList.add(anInputChannel);
+// inputChannelList.add(anInputChannel);
inputChannelLatch.countDown();
if (!registeredWithJMXServer) {
@@ -1102,6 +1141,7 @@ public abstract class BaseAnalysisEngine
}
public void addInputChannel(InputChannel anInputChannel) {
+ /*
inputChannelLatch.countDown();
if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
@@ -1109,18 +1149,65 @@ public abstract class BaseAnalysisEngine
inputChannelList.add(anInputChannel);
}
}
- }
+ */
+ if (!inputChannelMap.containsKey(anInputChannel.getType())) {
+ inputChannelMap.put(anInputChannel.getType(),anInputChannel);
+ inputChannelLatch.countDown();
+ if (!registeredWithJMXServer) {
+ registerServiceWithJMX(jmxContext, false);
+ registeredWithJMXServer = true;
+
+ }
+
+ }
+ if ( anInputChannel.getChannelType().equals(ChannelType.REQUEST_REPLY)) {
+ inputChannel = anInputChannel;
+ }
+ }
public InputChannel getInputChannel() {
- try {
+ return inputChannel;
+ }
+ public InputChannel getInputChannel(ENDPOINT_TYPE type) {
+ /*
+ try {
inputChannelLatch.await();
} catch (Exception e) {
}
return inputChannel;
+ */
+ return inputChannelMap.get(type);
+ }
+ public InputChannel getInputChannel(Class<?> clz) {
+ return inputChannel;
+ }
+ public List<Listener> getAllListeners() {
+ List<Listener> listeners = new ArrayList<Listener>();
+ if ( getInputChannel(ENDPOINT_TYPE.JMS) != null ) {
+ listeners.addAll(getInputChannel(ENDPOINT_TYPE.JMS).getListeners());
+ }
+ if ( getInputChannel(ENDPOINT_TYPE.DIRECT) != null ) {
+ listeners.addAll(getInputChannel(ENDPOINT_TYPE.DIRECT).getListeners());
+ }
+ return listeners;
+ }
+ public void setDirectInputChannel(DirectInputChannel anInputChannel) throws Exception {
+ // inputChannel = anInputChannel;
+
+ // inputChannelList.add(anInputChannel);
+ addInputChannel(anInputChannel);
+
+ inputChannelLatch.countDown();
+ if (!registeredWithJMXServer) {
+ registeredWithJMXServer = true;
+ registerServiceWithJMX(jmxContext, false);
+ }
+ }
+ public void setJmsInputChannel(InputChannel anInputChannel) throws Exception {
+ addInputChannel(anInputChannel);
}
-
public void dropCAS(CAS aCAS) {
if (aCAS != null) {
// Check if this method was called while another thread is stopping the service.
@@ -1180,7 +1267,7 @@ public abstract class BaseAnalysisEngine
if (!isStopped()) {
Endpoint endpoint = (Endpoint) anErrorContext.get(AsynchAEMessage.Endpoint);
if ( endpoint != null && !"WarmupDelegate".equals(endpoint.getDelegateKey() ) ) {
- getOutputChannel().sendReply((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR),
+ getOutputChannel(endpoint).sendReply((Throwable) anErrorContext.get(ErrorContext.THROWABLE_ERROR),
casReferenceId, parentCasReferenceId,
endpoint, AsynchAEMessage.Process);
}
@@ -1346,7 +1433,39 @@ public abstract class BaseAnalysisEngine
}
}
-
+ public void dropCASFromLocal(String aCasReferenceId) {
+ try {
+ CacheEntry entry = null ;
+ if ( inProcessCache.entryExists(aCasReferenceId)) {
+ entry = inProcessCache.getCacheEntryForCAS(aCasReferenceId);
+ }
+ if ( entry != null ) {
+ // Release semaphore which throttles ingestion of CASes from service
+ // input queue.
+ Semaphore semaphore=null;
+ if ( !isPrimitive() && (semaphore = entry.getThreadCompletionSemaphore()) != null ) {
+ semaphore.release();
+ }
+ if (localCache.containsKey(aCasReferenceId)) {
+ try {
+ localCache.lookupEntry(aCasReferenceId).setDropped(true);
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "dropCASFromLocal",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_parent_cas_notin_cache__INFO",
+ new Object[] {getComponentName(), aCasReferenceId });
+ }
+ }
+ localCache.remove(aCasReferenceId);
+ }
+ // Remove stats from the map maintaining CAS specific stats
+ if (perCasStatistics.containsKey(aCasReferenceId)) {
+ perCasStatistics.remove(aCasReferenceId);
+ }
+ }
+ } catch( Exception e) {
+ }
+ }
public void forceTimeoutOnPendingCases(String key) {
Delegate delegate = ((AggregateAnalysisEngineController) this).lookupDelegate(key);
// Cancel the delegate timer. No more responses are expected
@@ -1546,15 +1665,33 @@ public abstract class BaseAnalysisEngine
return false;
}
-
+/*
public OutputChannel getOutputChannel() {
return outputChannel;
}
-
+*/
public void setOutputChannel(OutputChannel outputChannel) throws Exception {
- this.outputChannel = outputChannel;
+// this.outputChannel = outputChannel;
+ outputChannelMap.put(outputChannel.getType().name(), outputChannel);
+
+ }
+ public OutputChannel getOutputChannel(ENDPOINT_TYPE et) {
+ return outputChannelMap.get(et.name());
+// return outputChannel;
+ }
+ public OutputChannel getOutputChannel(Endpoint anEndpoint) {
+ ENDPOINT_TYPE et = ENDPOINT_TYPE.DIRECT;
+
+ if ( anEndpoint.getServerURI().indexOf("http") > -1 ||
+ anEndpoint.getServerURI().indexOf("tcp") > -1 ) {
+ et = ENDPOINT_TYPE.JMS;
+ }
+//System.out.println("............... OutputChannel type:"+et.name());
+ OutputChannel oc = outputChannelMap.get(et.name());
+ return oc;
}
+
public AsynchAECasManager getCasManagerWrapper() {
return casManager;
}
@@ -1567,10 +1704,15 @@ public abstract class BaseAnalysisEngine
return inProcessCache;
}
- protected ResourceSpecifier getResourceSpecifier() {
+ public ResourceSpecifier getResourceSpecifier() {
return resourceSpecifier;
}
-
+ public void setServiceId(String sid) {
+ serviceId = sid;
+ }
+ public String getServiceId() {
+ return serviceId;
+ }
public String getName() {
return endpointName;
}
@@ -1794,12 +1936,12 @@ public abstract class BaseAnalysisEngine
public String getBrokerURL() {
// Wait until the connection factory is injected by Spring
- while (System.getProperty("BrokerURI") == null) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException ex) {
- }
- }
+// while (System.getProperty("BrokerURI") == null) {
+// try {
+// Thread.sleep(50);
+// } catch (InterruptedException ex) {
+// }
+// }
return System.getProperty("BrokerURI");
}
@@ -1909,12 +2051,11 @@ public abstract class BaseAnalysisEngine
/*
* Send an exception to the client if this is a top level service
*/
- if (cause != null && aCasReferenceId != null && getOutputChannel() != null
- && isTopLevelComponent()) {
+ if (cause != null && aCasReferenceId != null && isTopLevelComponent()) {
Endpoint clientEndpoint = null;
if ((clientEndpoint = getClientEndpoint()) != null) {
try {
- getOutputChannel().sendReply(cause, aCasReferenceId, null, clientEndpoint,
+ getOutputChannel(clientEndpoint).sendReply(cause, aCasReferenceId, null, clientEndpoint,
clientEndpoint.getCommand());
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -1937,11 +2078,16 @@ public abstract class BaseAnalysisEngine
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop__INFO",
new Object[] { getComponentName() });
}
- if (getOutputChannel() != null) {
- getOutputChannel().cancelTimers();
+ for( Entry<String, OutputChannel> oce : outputChannelMap.entrySet()) {
+ if ( oce.getValue() != null ) {
+ oce.getValue().cancelTimers();
+ }
}
+// if (getOutputChannel() != null) {
+// getOutputChannel().cancelTimers();
+// }
- if (this instanceof PrimitiveAnalysisEngineController) {
+ if (isPrimitive()) {
getControllerLatch().release();
// Stops the input channel of this service
stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
@@ -1986,8 +2132,17 @@ public abstract class BaseAnalysisEngine
adminContext = null;
} else {
// Stop output channel
- getOutputChannel().stop();
-
+ for( Entry<String, OutputChannel> oce : outputChannelMap.entrySet()) {
+ if ( oce.getValue() != null ) {
+ oce.getValue().stop();
+ }
+ }
+ //getOutputChannel().stop();
+ try {
+ // Remove all MBeans registered by this service
+ jmxManagement.destroy();
+ } catch (Exception e) {
+ }
try {
getInProcessCache().destroy();
} catch (Exception e) {
@@ -2002,9 +2157,9 @@ public abstract class BaseAnalysisEngine
if (statsMap != null) {
statsMap.clear();
}
- if (inputChannelList != null) {
- inputChannelList.clear();
- }
+// if (inputChannelList != null) {
+// inputChannelList.clear();
+// }
//inputChannel = null;
if (serviceErrorMap != null) {
@@ -2031,9 +2186,9 @@ public abstract class BaseAnalysisEngine
if (threadStateMap != null) {
threadStateMap.clear();
}
- if (inputChannelMap != null) {
- inputChannelMap.clear();
- }
+// if (inputChannelMap != null) {
+// inputChannelMap.clear();
+// }
if (controllerListeners != null) {
controllerListeners.clear();
}
@@ -2082,10 +2237,18 @@ public abstract class BaseAnalysisEngine
// we proceed with the shutdown of delegates and finally of the top level service.
if (isTopLevelComponent()) {
getInputChannel().setTerminating();
-
+ try {
+ // Stops all input channels of this service, but keep temp reply queue input channels open
+ // to process replies.
+ stopReceivingCASes(false); // dont kill listeners on temp queues. The remotes may send replies
+
+ } catch( Exception e) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
// Stops all input channels of this service, but keep temp reply queue input channels open
// to process replies.
- stopReceivingCASes(false); // dont kill listeners on temp queues. The remotes may send replies
if ( this instanceof PrimitiveAnalysisEngineController_impl &&
((PrimitiveAnalysisEngineController_impl)this).aeInstancePool != null ) {
// Since we are quiescing, destroy all AEs that are in AE pool. Those that
@@ -2120,8 +2283,11 @@ public abstract class BaseAnalysisEngine
stopReceivingCASes(true);
stopInputChannels(InputChannel.InputChannels, true);
System.out.println("UIMA-AS Service is Stopping, All CASes Have Been Processed");
- } catch( InterruptedException e) {
-
+ } catch( Exception e) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+
}
stop(true);
}
@@ -2163,16 +2329,26 @@ public abstract class BaseAnalysisEngine
((BaseAnalysisEngineController) parentController).stop();
} else if (!isStopped()) {
stopDelegateTimers();
- getOutputChannel().cancelTimers();
- InputChannel iC = getInputChannel(endpointName);
+ for( Entry<String, OutputChannel> oce : outputChannelMap.entrySet()) {
+ if ( oce.getValue() != null ) {
+ oce.getValue().cancelTimers();
+ }
+ }
+ // getOutputChannel().cancelTimers();
+ InputChannel iC = getInputChannel();
if ( iC != null) {
iC.setTerminating();
}
- // Stop the inflow of new input CASes
- stopInputChannel(true); // shutdownNow
- if ( iC != null ) {
- iC.terminate();
+ try {
+ // Stop the inflow of new input CASes
+ stopInputChannel(true); // shutdownNow
+ if ( iC != null ) {
+ iC.terminate();
+ }
+ } catch (Exception e) {
+
}
+
stopCasMultipliers();
stopTransportLayer();
if (cause != null && aCasReferenceId != null) {
@@ -2247,7 +2423,7 @@ public abstract class BaseAnalysisEngine
Endpoint freeCasNotificationEndpoint = casEntry.getFreeCasNotificationEndpoint();
if (freeCasNotificationEndpoint != null) {
freeCasNotificationEndpoint.setCommand(AsynchAEMessage.Stop);
- getOutputChannel().sendRequest(AsynchAEMessage.Stop, aCasReferenceId,
+ getOutputChannel(freeCasNotificationEndpoint).sendRequest(AsynchAEMessage.Stop, aCasReferenceId,
freeCasNotificationEndpoint);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -2298,7 +2474,14 @@ public abstract class BaseAnalysisEngine
* Stops a listener on the main input channel
* @param shutdownNow stop
*/
- protected void stopInputChannel(boolean shutdownNow) {
+ protected void stopInputChannel(boolean shutdownNow) throws Exception {
+ for( Listener listener : inputChannel.getListeners()) {
+ if ( listener.getType().equals(Type.GetMeta) ||
+ listener.getType().equals(Type.ProcessCAS)) {
+ inputChannel.disconnectListenerFromQueue(listener);
+ }
+ }
+ /*
InputChannel iC = getInputChannel(endpointName);
if (iC != null && !iC.isStopped()) {
try {
@@ -2312,15 +2495,27 @@ public abstract class BaseAnalysisEngine
}
}
}
+ */
}
private void setInputChannelForNoRecovery() {
+ /*
if ( inputChannelMap.size() > 0 ) {
InputChannel iC = getInputChannel();
iC.setTerminating();
}
+ */
+ inputChannel.setTerminating();
}
- protected void stopReceivingCASes(boolean stopAllListeners) {
-
+ private boolean isTempListener( Listener listener ) {
+ return listener.getType().equals(Type.Reply) || listener.getType().equals(Type.FreeCAS);
+ }
+ protected void stopReceivingCASes(boolean stopAllListeners) throws Exception {
+ for( Listener listener : inputChannel.getListeners()) {
+ if ( stopAllListeners || !isTempListener(listener) ) {
+ inputChannel.disconnectListenerFromQueue(listener);
+ }
+ }
+/*
InputChannel iC = null;
setInputChannelForNoRecovery();
Iterator<String> it = inputChannelMap.keySet().iterator();
@@ -2357,10 +2552,21 @@ public abstract class BaseAnalysisEngine
}
}
}
-
+ */
}
protected void stopInputChannels( int channelsToStop, boolean shutdownNow) { //, boolean norecovery) {
- InputChannel iC = null;
+ try {
+ for( Entry<BaseAnalysisEngineController.ENDPOINT_TYPE, InputChannel> ic : inputChannelMap.entrySet()) {
+ ic.getValue().stop(shutdownNow);
+ }
+ // inputChannel.stop(shutdownNow);
+ } catch( Exception e ) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ /*
+ InputChannel iC = null;
setInputChannelForNoRecovery();
Iterator it = inputChannelMap.keySet().iterator();
int i = 1;
@@ -2401,7 +2607,7 @@ public abstract class BaseAnalysisEngine
}
}
}
-
+ */
}
/**
* Aggregates have more than one Listener channel. This method stops all configured input channels
@@ -2426,7 +2632,7 @@ public abstract class BaseAnalysisEngine
}
return null;
}
-
+/*
public InputChannel getInputChannel(String anEndpointName) {
for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) {
@@ -2437,7 +2643,7 @@ public abstract class BaseAnalysisEngine
}
return null;
}
-
+*/
// public InputChannel getReplyInputChannel(String aDelegateKey) {
public InputChannel getReplyInputChannel(String aDestination) {
InputChannel IC = null;
@@ -2916,6 +3122,7 @@ public abstract class BaseAnalysisEngine
if ( anEndpoint.getServerURI().equals("vm://localhost?broker.persistent=false")) {
anEndpoint.setRemote(true);
}
+ /*
if (!anEndpoint.isRemote()) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
@@ -2946,8 +3153,12 @@ public abstract class BaseAnalysisEngine
} else {
getOutputChannel().sendReply(metadata, anEndpoint, true);
}
+ */
+ getOutputChannel(anEndpoint).sendReply(metadata, anEndpoint, true);
+
}
} catch (Exception e) {
+ e.printStackTrace();
HashMap map = new HashMap();
map.put(AsynchAEMessage.Endpoint, anEndpoint);
map.put(AsynchAEMessage.MessageType, Integer.valueOf(AsynchAEMessage.Request));
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerStatusListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,35 @@
+package org.apache.uima.aae.controller;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger;
+
+public class ControllerStatusListener implements ControllerCallbackListener {
+ CountDownLatch latch;
+ public ControllerStatusListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+ public void notifyOnTermination(String aMessage, EventTrigger cause) {
+ }
+
+ public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e) {
+ }
+
+ public void notifyOnInitializationSuccess(AnalysisEngineController aController) {
+ System.out.println("------- Controller:"+aController.getName()+" Initialized");
+ latch.countDown();
+ }
+
+ public void notifyOnInitializationFailure(Exception e) {
+ }
+
+ public void notifyOnInitializationSuccess() {
+ }
+
+ public void notifyOnReconnecting(String aMessage) {
+ }
+
+ public void notifyOnReconnectionSuccess() {
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/DelegateEndpoint.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,127 @@
+package org.apache.uima.aae.controller;
+
+import org.apache.uima.cas.SerialFormat;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class DelegateEndpoint {
+ public class Builder {
+ private Endpoint_impl e1 = new Endpoint_impl();
+ private String key;
+ private String uri;
+ private String endpointName;
+ private Object replyTo;
+ private String replyDestinatioName;
+ private int getMetaTimeout=0;
+ private int processTimeout=0;
+ private String getMetaActionOnError;
+ private String processActionOnError;
+ private int getMetaMaxRetries=0;
+ private int processsMaxRetries=0;
+ private boolean isTempReplyDestination;
+ private int collectionProcessCompleteTimeout=0;
+ private String serializer=SerialFormat.XMI.getDefaultFileExtension();
+ private int scaleout=1;
+ private String descriptor;
+ private ResourceSpecifier rs;
+ private boolean isRemote;
+
+ public Builder setRemote(boolean isRemote) {
+ this.isRemote = isRemote;
+ if ( !isRemote ) {
+ setServerURI("java");
+ }
+ return this;
+ }
+ public Builder withResourceSpecifier(ResourceSpecifier rs) {
+ this.rs = rs;
+ setDescriptor(rs.getSourceUrlString());
+ return this;
+ }
+ public Builder setTempDestination(boolean isTempReplyDestination) {
+ this.isTempReplyDestination = isTempReplyDestination;
+ return this;
+ }
+ public Builder setDescriptor(String descriptor) {
+ this.descriptor = descriptor;
+ return this;
+ }
+
+ public Builder setScaleout(int scaleout) {
+ this.scaleout = scaleout;
+ return this;
+ }
+
+ public Builder withDelegateKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public Builder setServerURI(String uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ public Builder withEndpointName(String endpointName) {
+ this.endpointName = endpointName;
+ return this;
+ }
+
+ public Builder setReplyToDestination(Object replyTo) {
+ this.replyTo = replyTo;
+ return this;
+ }
+
+ public Builder setReplyDestinationName(String replyDestinatioName) {
+ this.replyDestinatioName = replyDestinatioName;
+ return this;
+ }
+
+ public Builder setGetMetaErrorHandlingParams(int timeout, int maxRetries, String action) {
+ this.getMetaTimeout = timeout;
+ this.getMetaActionOnError = action;
+ this.getMetaMaxRetries = maxRetries;
+ return this;
+ }
+
+ public Builder setProcessErrorHandlingParams(int timeout, int maxRetries, String action) {
+ this.processTimeout = timeout;
+ this.processActionOnError = action;
+ this.processsMaxRetries = maxRetries;
+ return this;
+ }
+
+ public Builder setCollectionProcessCompleteTimeout(int timeout) {
+ this.collectionProcessCompleteTimeout = timeout;
+ return this;
+ }
+
+ public Builder setSerializer(String serializer) {
+ this.serializer = serializer;
+ return this;
+ }
+
+ public Endpoint_impl build() {
+ e1.setDelegateKey(key);
+ e1.setServerURI(uri);
+ e1.setEndpoint(endpointName);
+ e1.setDescriptor(descriptor);
+ e1.setConcurrentRequestConsumers(scaleout);
+ e1.setDestination(null);
+ e1.setReplyToEndpoint(replyDestinatioName);
+ e1.setMetadataRequestTimeout(getMetaTimeout);
+ e1.setProcessRequestTimeout(processTimeout);
+ e1.setTempReplyDestination(isTempReplyDestination);
+ e1.setCollectionProcessCompleteTimeout(0);
+ e1.setSerializer(SerialFormat.XMI.getDefaultFileExtension());
+ e1.setRemote(isRemote);
+// if ( uri != null && uri.length() > 0 && (uri.contains("tcp:") || uri.contains("http:") )) {
+// e1.setRemote(true);
+// } else {
+// e1.setRemote(false);
+// }
+ e1.setResourceSpecifier(rs);
+ return e1;
+ }
+
+ }
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Mon Feb 26 18:54:11 2018
@@ -31,8 +31,16 @@ public interface Endpoint {
public static final int DISABLED = 3;
+ public boolean isJavaRemote();
+
+ public void setJavaRemote();
+
public int getMetadataRequestTimeout();
+ public void setDisableJCasCache(boolean disableOrEnable);
+
+ public void setCollectionProcessCompleteTimeout(int cpcTimeout);
+
public void setController(AnalysisEngineController aController);
public void startCheckpointTimer();
@@ -47,6 +55,10 @@ public interface Endpoint {
public void setReplyEndpoint(boolean tORf);
+ public void setReplyDestination(Object replyDestination);
+
+ public Object getReplyDestination();
+
public boolean isReplyEndpoint();
public void setProcessRequestTimeout(int processRequestTimeout);
@@ -129,6 +141,10 @@ public interface Endpoint {
public void setDestination(Object aDestination);
+ public void setGetMetaDestination(Object aDestination);
+
+ public Object getMetaDestination();
+
public void setCommand(int aCommand);
public int getCommand();
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Mon Feb 26 18:54:11 2018
@@ -27,14 +27,21 @@ import org.apache.uima.aae.jmx.ServiceIn
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.TypeSystemImpl;
+import org.apache.uima.resource.ResourceSpecifier;
public class Endpoint_impl implements Endpoint, Cloneable {
- private static final Class CLASS_NAME = Endpoint_impl.class;
+ private static final Class<?> CLASS_NAME = Endpoint_impl.class;
+ private volatile boolean javaRemote=false;
+
private volatile Object destination = null;
private String endpoint; // is the queue name (only)
+ private volatile Object getMetaDestination = null;
+
+ private volatile Object replyDestination = null;
+
private String serverURI;
private volatile boolean initialized;
@@ -45,11 +52,11 @@ public class Endpoint_impl implements En
private volatile boolean waitingForResponse;
- private int metadataRequestTimeout;
+ private int metadataRequestTimeout=0;
- private int processRequestTimeout;
+ private int processRequestTimeout=0;
- private int collectionProcessCompleteTimeout;
+ private int collectionProcessCompleteTimeout=0;
private volatile boolean isRemote;
@@ -57,7 +64,7 @@ public class Endpoint_impl implements En
private SerialFormat serialFormat = null;
- private String serializer = "xmi"; // spring bean interface
+ private String serializer = SerialFormat.XMI.getDefaultFileExtension();
private volatile boolean finalEndpoint;
@@ -121,6 +128,23 @@ public class Endpoint_impl implements En
private volatile boolean disableJCasCache;
+ private ResourceSpecifier resourceSpecifier;
+
+ public void setJavaRemote() {
+ javaRemote = true;
+ }
+ public boolean isJavaRemote() {
+ return javaRemote;
+ }
+ public void setReplyDestination(Object replyDestination) {
+ this.replyDestination = replyDestination;
+ }
+ public Object getReplyDestination() {
+ return replyDestination;
+ }
+ public void setResourceSpecifier(ResourceSpecifier rs ) {
+ this.resourceSpecifier = rs;
+ }
public boolean isDisableJCasCache() {
return disableJCasCache;
}
@@ -319,11 +343,6 @@ public class Endpoint_impl implements En
public void setServerURI(String aServerURI) {
this.serverURI = aServerURI;
- if ( aServerURI != null && ( aServerURI.startsWith("vm:") == true && !aServerURI.equals("vm://localhost?broker.persistent=false") ) ){
- setRemote(false);
- } else {
- setRemote(true);
- }
}
public void setWaitingForResponse(boolean isWaiting) {
@@ -436,7 +455,12 @@ public class Endpoint_impl implements En
isRemote = aRemote;
}
-
+ public void setGetMetaDestination(Object aDestination) {
+ getMetaDestination = aDestination;
+ }
+ public Object getMetaDestination() {
+ return getMetaDestination;
+ }
public String getDescriptor() {
return descriptor;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Mon Feb 26 18:54:11 2018
@@ -71,7 +71,7 @@ public class LocalCache extends Concurre
if (entry != null && entry.isSubordinate()) {
// recursively call each parent until we get to the top of the
// Cas hierarchy
- parentCasReferenceId = lookupInputCasReferenceId(entry.getInputCasReferenceId());
+ parentCasReferenceId = lookupInputCasReferenceId(entry.getParentCasReferenceId());
} else {
return aCasReferenceId;
}
@@ -85,14 +85,14 @@ public class LocalCache extends Concurre
// recursively call each parent until we get to the top of the
// Cas hierarchy
parentCasReferenceId = lookupInputCasReferenceId((CasStateEntry) get(entry
- .getInputCasReferenceId()));
+ .getParentCasReferenceId()));
} else {
return entry.getCasReferenceId();
}
return parentCasReferenceId;
}
public void dumpContents() {
- dumpContents(false);
+ dumpContents(true);
}
public synchronized void dumpContents(boolean dump2Stdout) {
int count = 0;
@@ -109,7 +109,7 @@ public class LocalCache extends Concurre
if (casStateEntry.isSubordinate()) {
sb.append(entry.getKey() + " Number Of Child CASes In Play:"
+ casStateEntry.getSubordinateCasInPlayCount() + " Parent CAS id:"
- + casStateEntry.getInputCasReferenceId());
+ + casStateEntry.getParentCasReferenceId());
} else {
sb.append(entry.getKey() + " *** Input CAS. Number Of Child CASes In Play:"
+ casStateEntry.getSubordinateCasInPlayCount());
@@ -179,7 +179,7 @@ public class LocalCache extends Concurre
CasStateEntry casStateEntry = lookupEntry(casReferenceId);
if (casStateEntry.isSubordinate()) {
// Recurse until the top CAS reference Id is found
- return getTopCasAncestor(casStateEntry.getInputCasReferenceId());
+ return getTopCasAncestor(casStateEntry.getParentCasReferenceId());
}
// Return the top ancestor CAS id
return casStateEntry;
@@ -187,7 +187,12 @@ public class LocalCache extends Concurre
public static class CasStateEntry {
private String casReferenceId;
-
+ // id of a parent CAS
+ private String parentCasReferenceId;
+ // stores the id of an input CAS sent by the client. This is used
+ // to identify client endpoint if the service is a CM.
+ private String inputCasReferenceId;
+
private volatile boolean waitingForChildren; // true if in FinalState and still has children in play
private volatile boolean waitingForRealease;
@@ -212,8 +217,8 @@ public class LocalCache extends Concurre
private Object childCountMux = new Object();
- private String inputCasReferenceId;
-
+ private long seqNo;
+
private int numberOfParallelDelegates = 1;
private Delegate lastDelegate = null;
@@ -222,6 +227,9 @@ public class LocalCache extends Concurre
private Endpoint freeCasNotificationEndpoint;
+ // client endpoint where the input CAS will be returned
+ private Endpoint clientEndpoint;
+
private volatile boolean deliveryToClientFailed;
private String hostIpProcessingCAS;
@@ -249,6 +257,12 @@ public class LocalCache extends Concurre
protected Map<String, AEMetrics > casMetrics = new TreeMap<String, AEMetrics>();
+ public Endpoint getClientEndpoint() {
+ return clientEndpoint;
+ }
+ public void setClientEndpoint(Endpoint ce) {
+ clientEndpoint = ce;
+ }
public boolean waitingForChildrenToFinish() {
return waitingForChildren;
}
@@ -297,7 +311,12 @@ public class LocalCache extends Concurre
public void setDeliveryToClientFailed() {
this.deliveryToClientFailed = true;
}
-
+ public void setSequenceNumber(long seq) {
+ this.seqNo = seq;
+ }
+ public long getSequenceNumber() {
+ return this.seqNo;
+ }
public boolean isDropped() {
return dropped;
}
@@ -330,15 +349,20 @@ public class LocalCache extends Concurre
return casReferenceId;
}
- public String getInputCasReferenceId() {
- return inputCasReferenceId;
+ public String getParentCasReferenceId() {
+ return parentCasReferenceId;
}
- public void setInputCasReferenceId(String anInputCasReferenceId) {
- inputCasReferenceId = anInputCasReferenceId;
+ public void setParentCasReferenceId(String parentCasReferenceId) {
+ this.parentCasReferenceId = parentCasReferenceId;
subordinateCAS = true;
}
-
+ public String getInputCasReferenceId() {
+ return inputCasReferenceId;
+ }
+ public void setInputCasReferenceId(String inputCasId) {
+ inputCasReferenceId = inputCasId;
+ }
public void setWaitingForRelease(boolean flag) {
waitingForRealease = flag;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon Feb 26 18:54:11 2018
@@ -63,6 +63,7 @@ import org.apache.uima.analysis_engine.A
import org.apache.uima.analysis_engine.AnalysisEngineManagement;
import org.apache.uima.analysis_engine.CasIterator;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
+import org.apache.uima.as.client.DirectMessageContext;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.CASImpl;
import org.apache.uima.collection.CollectionReaderDescription;
@@ -263,7 +264,7 @@ public class PrimitiveAnalysisEngineCont
}
}
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, paramsMap);
-
+ System.out.println("------------ initializeAnalysisEngine()-Created instance of AE:"+getComponentName()+" Thread iD:"+Thread.currentThread().getId());
super.addUimaObject(ae.getManagementInterface().getUniqueMBeanName());
// Call to produceAnalysisEngine() may take a long time to complete. While this
// method was executing, the service may have been stopped. Before continuing
@@ -476,7 +477,7 @@ public class PrimitiveAnalysisEngineCont
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "postInitialize",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_initialized_controller__INFO",
- new Object[] { getComponentName() });
+ new Object[] { getComponentName(), super.getBrokerURL() });
}
super.serviceInitialized = true;
}
@@ -549,6 +550,7 @@ public class PrimitiveAnalysisEngineCont
"UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getComponentName() });
}
getServicePerformance().incrementAnalysisTime(super.getCpuTime() - start);
+ /*
if (!anEndpoint.isRemote()) {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.CollectionProcessComplete,
@@ -558,6 +560,8 @@ public class PrimitiveAnalysisEngineCont
} else {
getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
}
+*/
+ getOutputChannel(anEndpoint).sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint, null, false);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(),
@@ -741,7 +745,14 @@ public class PrimitiveAnalysisEngineCont
aem.getNumberOfCASesProcessed());
}
-
+ private void cancelStackDumpTimer(StackDumpTimer stackDumpTimer) {
+ if ( stackDumpTimer != null ) {
+ stackDumpTimer.cancel();
+ stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
+ // it in case an exception happens below
+ }
+
+ }
/**
* This is called when a Stop request is received from a client. Add the provided Cas id to the
* list of aborted CASes. The process() method checks this list to determine if it should continue
@@ -758,9 +769,9 @@ public class PrimitiveAnalysisEngineCont
if (stopped) {
return;
}
-
- List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
- List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
+ System.out.println("Service:"+getComponentName()+" CAS:"+aCasReferenceId+" CAS Hashcode:"+aCAS.hashCode());
+ List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects = new ArrayList<>();
+ List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects = new ArrayList<>();
CasStateEntry parentCasStateEntry = null;
// If enabled, keep a reference to a timer which
// when it expires, will cause a JVM to dump a stack
@@ -810,19 +821,15 @@ public class PrimitiveAnalysisEngineCont
}
CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
- if ( stackDumpTimer != null ) {
- stackDumpTimer.cancel();
- stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
- // it in case an exception happens below
- }
-
+ cancelStackDumpTimer(stackDumpTimer);
// Store how long it took to call processAndOutputNewCASes()
totalProcessTime = (super.getCpuTime() - time);
long sequence = 1;
long hasNextTime = 0; // stores time in hasNext()
- long getNextTime = 0; // stores time in next();
+ long getNextTime = 0; // stores time in next()
boolean moreCASesToProcess = true;
boolean casAbortedDueToExternalRequest = false;
+
while (moreCASesToProcess) {
long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
hasNextTime = super.getCpuTime();
@@ -831,23 +838,25 @@ public class PrimitiveAnalysisEngineCont
// method is allowed to complete. If the method is not complete in allowed window
// the heap and stack trace dump of all threads will be produced.
stackDumpTimer = ifEnabledStartHeapDumpTimer();
+
+ /* ********************************************** */
+ /* CHECK IF THERE ARE MORE CHILD CASes TO PROCESS */
+ /* ********************************************** */
if (!casIterator.hasNext()) {
moreCASesToProcess = false;
// Measure how long it took to call hasNext()
timeToProcessCAS = (super.getCpuTime() - hasNextTime);
totalProcessTime += timeToProcessCAS;
- if ( stackDumpTimer != null ) {
- stackDumpTimer.cancel();
- stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
- // it in case an exception happens below
- }
+ cancelStackDumpTimer(stackDumpTimer);
+ /* ************************************* */
+ /* WE ARE DONE PROCESSING INPUT CAS HERE */
+ /* ************************************* */
break; // from while
}
- if ( stackDumpTimer != null ) {
- stackDumpTimer.cancel();
- stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
- // it in case an exception happens below
- }
+
+
+ cancelStackDumpTimer(stackDumpTimer);
+
// Measure how long it took to call hasNext()
timeToProcessCAS = (super.getCpuTime() - hasNextTime);
getNextTime = super.getCpuTime();
@@ -856,17 +865,20 @@ public class PrimitiveAnalysisEngineCont
// method is allowed to complete. If the method is not complete in allowed window
// the heap and stack trace dump of all threads will be produced.
stackDumpTimer = ifEnabledStartHeapDumpTimer();
- CAS casProduced = casIterator.next();
- if ( stackDumpTimer != null ) {
- stackDumpTimer.cancel();
- stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
- // it in case an exception happens below
- }
+
+ /* ****************************** */
+ /* GET THE NEXT CHILD CAS */
+ /* ****************************** */
+ CAS childCAS = casIterator.next();
+
+ cancelStackDumpTimer(stackDumpTimer);
+
// Add how long it took to call next()
timeToProcessCAS += (super.getCpuTime() - getNextTime);
// Add time to call hasNext() and next() to the running total
totalProcessTime += timeToProcessCAS;
casAbortedDueToExternalRequest = abortGeneratingCASes(aCasReferenceId);
+
// If the service is stopped or aborted, stop generating new CASes and just return the input
// CAS
if (stopped || casAbortedDueToExternalRequest) {
@@ -891,8 +903,8 @@ public class PrimitiveAnalysisEngineCont
// We are either stopping the service or aborting input CAS due to explicit STOP
// request
// from a client. If a new CAS was produced, release it back to the pool.
- if (casProduced != null) {
- casProduced.release();
+ if (childCAS != null) {
+ childCAS.release();
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -935,24 +947,22 @@ public class PrimitiveAnalysisEngineCont
// OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
MessageContext mContext = getInProcessCache()
.getMessageAccessorByReference(aCasReferenceId);
- CacheEntry newEntry = getInProcessCache().register(casProduced, mContext /*, otsd*/);
+ CacheEntry newEntry = getInProcessCache().register(childCAS, mContext /*, otsd*/);
// if this Cas Multiplier is not Top Level service, add new Cas Id to the private
// cache of the parent aggregate controller. The Aggregate needs to know about
// all CASes it has in play that were generated from the input CAS.
CasStateEntry childCasStateEntry = null;
if (!isTopLevelComponent()) {
- newEntry.setNewCas(true, parentController.getComponentName());
- // Create CAS state entry in the aggregate's local cache
- childCasStateEntry = parentController.getLocalCache().createCasStateEntry(
- newEntry.getCasReferenceId());
- // Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
- // number of child CASes associated with it.
- parentCasStateEntry = parentController.getLocalCache().lookupEntry(aCasReferenceId);
- } else {
- childCasStateEntry = getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
- }
+ newEntry.setNewCas(true, parentController.getComponentName());
+ // Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
+ // number of child CASes associated with it.
+ parentCasStateEntry = parentController.getLocalCache().lookupEntry(aCasReferenceId);
+ }
+ childCasStateEntry = getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
+
// Associate parent CAS (input CAS) with the new CAS.
- childCasStateEntry.setInputCasReferenceId(aCasReferenceId);
+ childCasStateEntry.setParentCasReferenceId(aCasReferenceId);
+ childCasStateEntry.setInputCasReferenceId(parentCasStateEntry.getInputCasReferenceId());
// Increment number of child CASes generated from the input CAS
parentCasStateEntry.incrementSubordinateCasInPlayCount();
parentCasStateEntry.incrementOutstandingFlowCounter();
@@ -960,6 +970,8 @@ public class PrimitiveAnalysisEngineCont
// Associate input CAS with the new CAS
newEntry.setInputCasReferenceId(aCasReferenceId);
newEntry.setCasSequence(sequence);
+ childCasStateEntry.setSequenceNumber(sequence);
+
// Add to the cache how long it took to process the generated (subordinate) CAS
getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -1007,48 +1019,26 @@ public class PrimitiveAnalysisEngineCont
"UIMAEE_exception__WARNING", exx);
}
- if (!anEndpoint.isRemote()) {
- UimaTransport transport = getTransport(anEndpoint.getEndpoint());
- UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
- AsynchAEMessage.Request, getName());
- message.addStringProperty(AsynchAEMessage.CasReference, newEntry.getCasReferenceId());
- message.addStringProperty(AsynchAEMessage.InputCasReference, aCasReferenceId);
- message.addLongProperty(AsynchAEMessage.CasSequence, sequence);
- ServicePerformance casStats = getCasStatistics(aCasReferenceId);
-
- message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
- .getRawCasSerializationTime());
- message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
- .getRawCasDeserializationTime());
- message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
- long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
- message.addLongProperty(AsynchAEMessage.IdleTime, iT);
- if (!stopped) {
- transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
- dropStats(newEntry.getCasReferenceId(), getName());
- }
- } else {
- // Send generated CAS to the remote client
- if (!stopped) {
- getOutputChannel().sendReply(newEntry, anEndpoint);
-
- // Check for delivery failure. The client may have terminated while an input CAS was being processed
- if ( childCasStateEntry.deliveryToClientFailed() ) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
- new Object[] { getComponentName(), aCasReferenceId });
- }
- clientUnreachable = true;
- if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
- cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());
- }
- // Stop generating new CASes. We failed to send a CAS to a client. Most likely
- // the client has terminated.
- moreCASesToProcess = false; // exit the while loop
-
- dropCAS(childCasStateEntry.getCasReferenceId(), true);
+ // Send generated CAS to the client
+ if (!stopped) {
+ getOutputChannel(anEndpoint).sendReply(childCasStateEntry, anEndpoint);
+
+ // Check for delivery failure. The client may have terminated while an input CAS was being processed
+ if ( childCasStateEntry.deliveryToClientFailed() ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
+ new Object[] { getComponentName(), aCasReferenceId });
}
+ clientUnreachable = true;
+ if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
+ cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());
+ }
+ // Stop generating new CASes. We failed to send a CAS to a client. Most likely
+ // the client has terminated.
+ moreCASesToProcess = false; // exit the while loop
+
+ dropCAS(childCasStateEntry.getCasReferenceId(), true);
}
}
// Remove new CAS state entry from the local cache if this is a top level primitive.
@@ -1117,6 +1107,8 @@ public class PrimitiveAnalysisEngineCont
// Create a List to hold per CAS analysisTime and total number of CASes processed
// by each AE. This list will be serialized and sent to the client
List<AnalysisEnginePerformanceMetrics> performanceList =
+ getCasMetricList(parentCasStateEntry, afterAnalysisManagementObjects, beforeAnalysisManagementObjects);
+ /*
new ArrayList<AnalysisEnginePerformanceMetrics>();
// Diff the before process() performance metrics with post process performance
// metrics
@@ -1158,70 +1150,14 @@ public class PrimitiveAnalysisEngineCont
}
}
}
+ */
parentCasStateEntry.getAEPerformanceList().addAll(performanceList);
- if (!anEndpoint.isRemote()) {
- inputCASReturned = true;
- UimaTransport transport = getTransport(anEndpoint.getEndpoint());
-
- if (getInProcessCache() != null && getInProcessCache().getSize() > 0
- && getInProcessCache().entryExists(aCasReferenceId)) {
- try {
- CacheEntry ancestor =
- getInProcessCache().
- getTopAncestorCasEntry(getInProcessCache().getCacheEntryForCAS(aCasReferenceId));
- if ( ancestor != null ) {
- ancestor.addDelegateMetrics(getKey(), performanceList);
- }
- } catch (Exception e) {
- // An exception be be thrown here if the service is being stopped.
- // The top level controller may have already cleaned up the cache
- // and the getCacheEntryForCAS() will throw an exception. Ignore it
- // here, we are shutting down.
- }
- }
-
- UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
- AsynchAEMessage.Response, getName());
- message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- ServicePerformance casStats = getCasStatistics(aCasReferenceId);
- message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
- .getRawCasSerializationTime());
- message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
- .getRawCasDeserializationTime());
- message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
- long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
- message.addLongProperty(AsynchAEMessage.IdleTime, iT);
- // Send reply back to the client. Use internal (non-jms) transport
- if (!stopped) {
- transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
- dropStats(aCasReferenceId, getName());
- }
- } else {
try {
- List<AnalysisEnginePerformanceMetrics> perfMetrics =
- new ArrayList<AnalysisEnginePerformanceMetrics>();
- String aeName = getMetaData().getName();
-
+
CacheEntry entry =
getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- for( AnalysisEnginePerformanceMetrics m : performanceList ) {
- // System.out.println("...............BEFORE: Name:"+m.getName()+" UniqueName:"+m.getUniqueName()+" How Many="+m.getNumProcessed());
- boolean aggregate = m.getUniqueName().startsWith("/"+aeName);
- int pos = m.getUniqueName().indexOf("/",1);
- String uName = m.getUniqueName();
- if ( pos > -1 && aeInstancePool.size() > 1 && aeName != null && aggregate) {
- String st = m.getUniqueName().substring(pos);
- uName = "/"+aeName+st;
- }
- AnalysisEnginePerformanceMetrics newMetrics =
- new AnalysisEnginePerformanceMetrics(m.getName(),uName,m.getAnalysisTime(), m.getNumProcessed());
- // System.out.println("... Metrics - AE:"+metrics.getUniqueName()+" AE Analysis Time:"+metrics.getAnalysisTime());
- perfMetrics.add(newMetrics);
-// System.out.println("...............AFTER: Name:"+newMetrics.getName()+" UniqueName:"+newMetrics.getUniqueName()+" How Many="+newMetrics.getNumProcessed());
-
- }
- entry.addDelegateMetrics(getKey(), perfMetrics); //performanceList);
+ entry.addDelegateMetrics(getKey(), performanceList);
} catch (Exception e) {
// An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
@@ -1230,12 +1166,11 @@ public class PrimitiveAnalysisEngineCont
}
if (!stopped && !clientUnreachable ) {
- getOutputChannel().sendReply(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+// getOutputChannel(anEndpoint).sendReply(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+ getOutputChannel(anEndpoint).sendReply(getLocalCache().lookupEntry(aCasReferenceId), anEndpoint);
}
inputCASReturned = true;
- }
-
// Remove input CAS state entry from the local cache
if (!isTopLevelComponent()) {
localCache.lookupEntry(aCasReferenceId).setDropped(true);
@@ -1297,13 +1232,67 @@ public class PrimitiveAnalysisEngineCont
((CASImpl) aCAS).enableReset(true);
}
// Remove input CAS cache entry if the CAS has been sent to the client
- dropCAS(aCasReferenceId, true);
+ MessageContext mContext = getInProcessCache()
+ .getMessageAccessorByReference(aCasReferenceId);
+ if ( mContext != null && mContext instanceof DirectMessageContext) {
+ dropCASFromLocal(aCasReferenceId);
+ } else {
+ // Remove input CAS cache entry if the CAS has been sent to the client
+ dropCAS(aCasReferenceId, true);
+ }
+ //dropCAS(aCasReferenceId, true);
+
localCache.dumpContents();
}
}
}
}
+ private List<AnalysisEnginePerformanceMetrics> getCasMetricList(CasStateEntry parentCasStateEntry, List<AnalysisEnginePerformanceMetrics> afterAnalysisList, List<AnalysisEnginePerformanceMetrics> beforeAnalysisList) {
+ List<AnalysisEnginePerformanceMetrics> performanceList =
+ new ArrayList<AnalysisEnginePerformanceMetrics>();
+ // Diff the before process() performance metrics with post process performance
+ // metrics
+ for (AnalysisEnginePerformanceMetrics after : afterAnalysisList) {
+ for( AnalysisEnginePerformanceMetrics before: beforeAnalysisList) {
+ if ( before.getUniqueName().equals(after.getUniqueName())) {
+ boolean found = false;
+ AnalysisEnginePerformanceMetrics metrics = null;
+ for( AnalysisEnginePerformanceMetrics met : parentCasStateEntry.getAEPerformanceList() ) {
+ String un = after.getUniqueName();
+ if ( un.indexOf("Components") >= -1 ) {
+ un = un.substring(un.indexOf("/"));
+ }
+ if ( met.getUniqueName().equals(un)) {
+ long at = after.getAnalysisTime()- before.getAnalysisTime();
+ metrics = new AnalysisEnginePerformanceMetrics(after.getName(),
+ un,//after.getUniqueName(),
+ met.getAnalysisTime()+at,
+ after.getNumProcessed());
+ found = true;
+ parentCasStateEntry.getAEPerformanceList().remove(met);
+ break;
+ }
+ }
+ if ( !found ) {
+ String un = after.getUniqueName();
+
+ if ( un.indexOf("Components") >= -1 ) {
+ un = un.substring(un.indexOf("/"));
+ }
+ metrics = new AnalysisEnginePerformanceMetrics(after.getName(),
+ un,//after.getUniqueName(),
+ after.getAnalysisTime()- before.getAnalysisTime(),
+ after.getNumProcessed());
+
+ }
+ performanceList.add(metrics);
+ break;
+ }
+ }
+ }
+ return performanceList;
+ }
private void addConfigIntParameter(String aParamName, int aParamValue) {
ConfigurationParameter cp = new ConfigurationParameter_impl();
cp.setMandatory(false);
@@ -1414,6 +1403,13 @@ public class PrimitiveAnalysisEngineCont
public void stop() {
super.stop(true); // shutdown now
+ if ( getLocalCache().size() > 0 ) {
+ for( Entry<String, LocalCache.CasStateEntry> entry : getLocalCache().entrySet()) {
+ System.out.println("........... Controller:"+getComponentName()+" - stop() - Releasing CAS:"+entry.getKey());
+ releaseNextCas(entry.getKey());
+ }
+ }
+
if (aeInstancePool != null) {
try {
aeInstancePool.destroy();
@@ -1517,7 +1513,7 @@ public class PrimitiveAnalysisEngineCont
/**
* The HeapDumpTimer is optionally used to dump the heap if a task takes too much time to finish.
- * It is enabled from the System property -DheapDumpThreshold=x where x is a number of seconds
+ * It is enabled from the System property -DheapDumpThreshold=<x> where x is a number of seconds
* the task is allowed to complete. If the task is not completed, the heap dump will be created.
*
*
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Mon Feb 26 18:54:11 2018
@@ -822,6 +822,7 @@ public abstract class Delegate {
* - command for which the timer is started
*/
private void startDelegateGetMetaTimer(final String aCasReferenceId, final int aCommand) {
+ Thread.dumpStack();
synchronized( getMetaTimerLock ) {
final long timeToWait = getTimeoutValueForCommand(aCommand);
Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandler.java Mon Feb 26 18:54:11 2018
@@ -43,6 +43,6 @@ public interface ErrorHandler {
public boolean handleError(Throwable t, ErrorContext anErrorContext,
AnalysisEngineController aController);
- public Map getEndpointThresholdMap();
+ public Map<String,Threshold> getEndpointThresholdMap();
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerBase.java Mon Feb 26 18:54:11 2018
@@ -225,7 +225,7 @@ public abstract class ErrorHandlerBase {
if (aController instanceof AggregateAnalysisEngineController && (masterEndpoint != null && masterEndpoint.getStatus() == Endpoint.FAILED)) {
// Fetch an InputChannel that handles messages for a given delegate
- InputChannel iC = aController.getReplyInputChannel(masterEndpoint.getDelegateKey());
+ InputChannel iC = aController.getInputChannel();
// Create a new Listener, new Temp Queue and associate the listener with the Input Channel
iC.createListener(masterEndpoint.getDelegateKey(), null);
iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey());
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/ErrorHandlerChain.java Mon Feb 26 18:54:11 2018
@@ -27,11 +27,21 @@ import java.util.Map;
import org.apache.uima.aae.controller.AnalysisEngineController;
-public class ErrorHandlerChain extends LinkedList {
- public ErrorHandlerChain(List aChainofHandlers) {
+public class ErrorHandlerChain extends LinkedList<ErrorHandler> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public ErrorHandlerChain(List<ErrorHandler> aChainofHandlers) {
this.addAll(aChainofHandlers);
}
+ public ErrorHandlerChain() {
+ }
+ public void setErrorHandler(ErrorHandler eh ) {
+ this.add(eh);
+ }
public Map getThresholds() {
Map thresholds = new HashMap();
Iterator iterator = this.iterator();
@@ -51,7 +61,7 @@ public class ErrorHandlerChain extends L
if (t instanceof AsynchAEException && t.getCause() != null) {
cause = t.getCause();
}
- Iterator iterator = this.iterator();
+ Iterator<ErrorHandler> iterator = this.iterator();
while (errorHandled == false && iterator.hasNext()) {
ErrorHandler handler = ((ErrorHandler) iterator.next());
errorHandled = handler.handleError(cause, anErrorContext, aController);
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/Thresholds.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,89 @@
+package org.apache.uima.aae.error;
+
+import java.util.Map;
+
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.GetMetadataErrorsType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+
+public class Thresholds {
+ public static Threshold newThreshold() {
+ Threshold t = new Threshold();
+ t.setAction("terminate");
+ t.setContinueOnRetryFailure(false);
+ t.setMaxRetries(0);
+ t.setThreshold(1);
+ t.setWindow(0);
+ return t;
+ }
+ public static Threshold getThreshold(String action, int maxRetries ) {
+ Threshold t1 = newThreshold();
+ t1.setAction(action);
+ t1.setContinueOnRetryFailure(false);
+ t1.setMaxRetries(maxRetries);
+ t1.setThreshold(1);
+
+ return t1;
+ }
+ public static Threshold getThresholdFor(ProcessCasErrorsType processCasErrors) {
+ Threshold t;
+ if ( processCasErrors == null ) {
+ t = newThreshold();
+ } else {
+ t = getThreshold(processCasErrors.getThresholdAction(), processCasErrors.getMaxRetries() );
+ t.setThreshold(processCasErrors.getThresholdCount());
+ t.setContinueOnRetryFailure(Boolean.valueOf(processCasErrors.getContinueOnRetryFailure()));
+ t.setWindow(processCasErrors.getThresholdWindow());
+ }
+ return t;
+ }
+ public static Threshold getThresholdFor(GetMetadataErrorsType metaErrors) {
+ Threshold t;
+ if ( metaErrors == null ) {
+ t = newThreshold();
+ } else {
+ //metaErrors.get
+ t = getThreshold(metaErrors.getErrorAction(), metaErrors.getMaxRetries() );
+
+ }
+ return t;
+ }
+ public static Threshold getThresholdFor(CollectionProcessCompleteErrorsType cpcErrors) {
+ Threshold t;
+ if ( cpcErrors == null ) {
+ t = newThreshold();
+ } else {
+ t = getThreshold(cpcErrors.getAdditionalErrorAction(), 0);
+ }
+ return t;
+ }
+ public static void addDelegateErrorThreshold(AnalysisEngineDelegate delegate, GetMetadataErrorsType errorHandler, Map<String, Threshold> thresholdMap) {
+ if ( errorHandler != null ) {
+ Threshold t = getThresholdFor(errorHandler);
+ delegate.setGetMetaTimeout(errorHandler.getTimeout());
+ thresholdMap.put(delegate.getKey(), t);
+ } else {
+ thresholdMap.put(delegate.getKey(), newThreshold());
+ }
+ }
+ public static void addDelegateErrorThreshold(AnalysisEngineDelegate delegate, ProcessCasErrorsType errorHandler, Map<String, Threshold> thresholdMap) {
+ if ( errorHandler != null ) {
+ Threshold t = getThresholdFor(errorHandler);
+ delegate.setProcessTimeout(errorHandler.getTimeout());
+ thresholdMap.put(delegate.getKey(), t);
+ } else {
+ thresholdMap.put(delegate.getKey(), newThreshold());
+ }
+ }
+ public static void addDelegateErrorThreshold(AnalysisEngineDelegate delegate, CollectionProcessCompleteErrorsType errorHandler, Map<String, Threshold> thresholdMap) {
+ if ( errorHandler != null ) {
+ Threshold t = getThresholdFor(errorHandler);
+ delegate.setProcessTimeout(errorHandler.getTimeout());
+ thresholdMap.put(delegate.getKey(), t);
+ } else {
+ thresholdMap.put(delegate.getKey(), newThreshold());
+ }
+ }
+
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/CpcErrorHandler.java Mon Feb 26 18:54:11 2018
@@ -76,7 +76,7 @@ public class CpcErrorHandler extends Err
if (aController instanceof AggregateAnalysisEngineController) {
endpoint = ((AggregateAnalysisEngineController) aController).getClientEndpoint();
}
- aController.getOutputChannel().sendReply(t, null, null, endpoint,
+ aController.getOutputChannel(endpoint).sendReply(t, null, null, endpoint,
AsynchAEMessage.CollectionProcessComplete);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon Feb 26 18:54:11 2018
@@ -113,13 +113,14 @@ public class ProcessCasErrorHandler exte
if (anEndpoint != null && aCasReferenceId != null && !anEndpoint.isCasMultiplier()) {
try {
if (!anEndpoint.isRemote()) {
+ /*
anEndpoint.setReplyEndpoint(true);
UimaTransport vmTransport = aController.getTransport(anEndpoint.getEndpoint());
UimaMessage message = vmTransport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Response, aController.getName());
message.addIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-
+*/
Throwable wrapper = null;
if (!(t instanceof UimaEEServiceException)) {
// Strip off AsyncAEException and replace with UimaEEServiceException
@@ -129,6 +130,7 @@ public class ProcessCasErrorHandler exte
wrapper = new UimaEEServiceException(t);
}
}
+ /*
if (wrapper == null) {
message.addObjectProperty(AsynchAEMessage.Cargo, t);
} else {
@@ -138,6 +140,16 @@ public class ProcessCasErrorHandler exte
vmTransport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
aController.dropStats(aCasReferenceId, aController.getName());
}
+ */
+ if (!aController.isStopped()) {
+ aController.dropStats(aCasReferenceId, aController.getName());
+ }
+ if (wrapper == null) {
+ aController.getOutputChannel(anEndpoint).sendReply(t, aCasReferenceId, null, anEndpoint, AsynchAEMessage.Process);
+ } else {
+ aController.getOutputChannel(anEndpoint).sendReply(wrapper, aCasReferenceId, null, anEndpoint, AsynchAEMessage.Process);
+ }
+
} else {
CasStateEntry stateEntry = null;
String parentCasReferenceId = null;
@@ -152,7 +164,7 @@ public class ProcessCasErrorHandler exte
}
if (!aController.isStopped()) {
- aController.getOutputChannel().sendReply(t, aCasReferenceId, parentCasReferenceId,
+ aController.getOutputChannel(anEndpoint).sendReply(t, aCasReferenceId, parentCasReferenceId,
anEndpoint, AsynchAEMessage.Process);
}
}
@@ -516,7 +528,7 @@ public class ProcessCasErrorHandler exte
doSendReplyToClient = false;
// Check if the CAS is a subordinate (has parent CAS).
if (casStateEntry != null && casStateEntry.isSubordinate()) {
- String parentCasReferenceId = casStateEntry.getInputCasReferenceId();
+ String parentCasReferenceId = casStateEntry.getParentCasReferenceId();
if (parentCasReferenceId != null) {
try {
CacheEntry parentCasCacheEntry = aController.getInProcessCache().getCacheEntryForCAS(
@@ -542,7 +554,7 @@ public class ProcessCasErrorHandler exte
parentCasStateEntry.setFailed();
while (predecessorCas != null && predecessorCas.isSubordinate()) {
predecessorCas = aController.getLocalCache().lookupEntry(
- predecessorCas.getInputCasReferenceId());
+ predecessorCas.getParentCasReferenceId());
predecessorCas.setFailed();
}
predecessorCas.addThrowable(t);
@@ -648,7 +660,7 @@ public class ProcessCasErrorHandler exte
cmEndpoint.setReplyEndpoint(true);
cmEndpoint.setIsCasMultiplier(true);
cmEndpoint.setFreeCasEndpoint(true);
- aController.getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS,
+ aController.getOutputChannel(cmEndpoint).sendRequest(AsynchAEMessage.ReleaseCAS,
cacheEntry.getCasReferenceId(), cmEndpoint);
}
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java?rev=1825401&r1=1825400&r2=1825401&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java Mon Feb 26 18:54:11 2018
@@ -31,7 +31,7 @@ import org.apache.uima.aae.message.Messa
import org.apache.uima.util.Level;
public class MetadataRequestHandler_impl extends HandlerBase {
- private static final Class CLASS_NAME = MetadataRequestHandler_impl.class;
+ private static final Class<?> CLASS_NAME = MetadataRequestHandler_impl.class;
public MetadataRequestHandler_impl(String aName) {
super(aName);