You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/10/18 14:12:01 UTC
svn commit: r1844241 [2/4] - in /uima/uima-as/branches/uima-as-3:
aggregate-uima-as/ uima-as-parent/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/
uimaj-as-activ...
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,267 @@
+package org.apache.uima.aae.component.dd;
+
+import java.io.File;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.uima.aae.UimaASUtils;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.CasMultiplierComponent;
+import org.apache.uima.aae.component.CasMultiplierNature;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
+import org.apache.uima.aae.component.factory.AnalysisEngineComponentFactory;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.xmlbeans.XmlDocumentProperties;
+
+public class DeploymentDescriptorProcessor {
+
+ private AnalysisEngineDeploymentDescriptionDocument dd = null;
+ public DeploymentDescriptorProcessor() {
+
+ }
+ public DeploymentDescriptorProcessor(AnalysisEngineDeploymentDescriptionDocument dd) {
+ this.dd = dd;
+ }
+ public AnalysisEngineComponent newComponent(String descriptorPath) throws Exception {
+ this.dd = parseDD(descriptorPath);
+ return newComponent();
+ }
+
+ public TopLevelServiceComponent newComponent() throws Exception {
+ ServiceType service =
+ dd.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+ XmlDocumentProperties dp = dd.documentProperties();
+ System.out.println(dp.getSourceName());
+
+ // get absolute path to resource specifier
+ String aeDescriptor =
+ UimaASUtils.fixPath(dp.getSourceName(), getDescriptor(service));
+
+ // Get top level uima resource specifier
+ ResourceSpecifier resourceSpecifier =
+ UimaClassFactory.produceResourceSpecifier(aeDescriptor);
+
+ AnalysisEngineComponentFactory componentFactory =
+ new AnalysisEngineComponentFactory();
+ // Process top level AE resource specifier and all its delegates.
+ // For aggregates, recursively walk through a delegate tree, producing
+ // a tree of AnalysisEngineComponent instances, one for every delegate.
+
+ AnalysisEngineComponent aeComponent =
+ componentFactory.produce(resourceSpecifier, null);
+
+ // Decorate above with top level component functionality
+ TopLevelServiceComponent topLevelComponent =
+ new TopLevelServiceComponent(aeComponent, dd);
+
+// if ( aeComponent.isPrimitive()) {
+// // The AE descriptor is for a primitive AE
+// } else {
+ // the AE descriptor is for an aggregate AE. Check DD to
+ // see if its an async aggregate. Its async=true or
+ // has delegates.
+// if ( isAggregate(service.getAnalysisEngine()) ) {
+ if ( topLevelComponent.isAggregate()) {
+ // All delegates will be colocated unless
+ // a delegate is remote. That is determined
+ // below.
+// aeComponent.enableAsync();
+ //if ( dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol()
+ DelegateAnalysisEngineType[] colocatedDelegates = null;
+ if ( Objects.nonNull(service.getAnalysisEngine()) &&
+ Objects.nonNull(service.getAnalysisEngine().getDelegates()) ) {
+ colocatedDelegates = service.getAnalysisEngine().
+ getDelegates().
+ getAnalysisEngineArray();
+
+ }
+ handleColocatedDelegates(colocatedDelegates, aeComponent.getChildren());
+
+ RemoteAnalysisEngineType[] remoteDelegates = null;
+ if ( Objects.nonNull(service.getAnalysisEngine()) &&
+ Objects.nonNull(service.getAnalysisEngine().getDelegates())) {
+ remoteDelegates = service.getAnalysisEngine().
+ getDelegates().
+ getRemoteAnalysisEngineArray();
+ }
+ handleRemoteDelegates(remoteDelegates, aeComponent.getChildren());
+
+// service.getAnalysisEngine().
+// getDelegates().
+ //}
+ }
+
+
+ return topLevelComponent;
+ }
+ public AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+ return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));
+
+ }
+ private boolean isAggregate(AnalysisEngineType aet) {
+ return ("true".equals(aet.getAsync()) || aet.isSetAsync() || aet.isSetDelegates());
+ }
+ private String getDescriptor(ServiceType service) {
+ String aeDescriptor = service.getTopDescriptor().getImport().getLocation();
+ if ( aeDescriptor == null ) {
+ aeDescriptor = service.getTopDescriptor().getImport().getName();
+ }
+ return aeDescriptor;
+ }
+
+ private void markAllDelegatesAsAsync(List<AnalysisEngineComponent> resourceSpecifierDelegates) {
+ for ( AnalysisEngineComponent aec : resourceSpecifierDelegates ) {
+ if ( !aec.isPrimitive() ) {
+ handleColocatedDelegates(null, aec.getChildren());
+ }
+ aec.enableAsync();
+ }
+
+ }
+ private void handleColocatedDelegates(DelegateAnalysisEngineType[] ddDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+ if ( Objects.isNull(ddDelegates)) {
+ // the dd does not include delegates but is configured as an asynch service
+ // so process resource specifiers recursively marking each part of a pipeline
+ // as asynch so that it is deployed as a collocated asynch service.
+ handleDefaultColocatedDelegates(resourceSpecifierDelegates);
+ //markAllDelegatesAsAsync(resourceSpecifierDelegates);
+ return;
+ }
+ // go through all delegates defined in the deployment descriptor (dd)
+ for( DelegateAnalysisEngineType ddDelegate : ddDelegates ) {
+ // find a matching delegate in the AE resource specifier
+ for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) {
+ if ( ddDelegate.getKey().equals(resourceSpecifierDelegate.getKey())) {
+ if ( resourceSpecifierDelegate.isCasMultiplier() && Objects.nonNull(ddDelegate.getCasMultiplier())) {
+ // plugin cas multiplier settings from dd
+ CasMultiplierNature casMultiplier =
+ new CasMultiplierComponent(ddDelegate.getCasMultiplier().getDisableJCasCache(),
+ TypeConverter.convertStringToLong(ddDelegate.getCasMultiplier().getInitialFsHeapSize(), 1000),
+ ddDelegate.getCasMultiplier().getPoolSize(),
+ TypeConverter.convertStringToBoolean(ddDelegate.getCasMultiplier().getProcessParentLast(),true) );
+ resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+
+ }
+ resourceSpecifierDelegate.enableAsync(); // delegate is async
+
+ resourceSpecifierDelegate.
+ withScaleout(Objects.isNull(ddDelegate.getScaleout()) ? 1 :ddDelegate.getScaleout().getNumberOfInstances()).
+ withRequestThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInputQueueScaleout(), 1)).
+ withReplyThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInternalReplyQueueScaleout(),1));
+
+ if ( isAggregate(ddDelegate) ) {
+
+ resourceSpecifierDelegate.enableAsync();
+ for ( AnalysisEngineComponent aec : resourceSpecifierDelegate.getChildren() ) {
+ aec.enableAsync();
+ }
+ if ( ddDelegate.getDelegates() != null ) {
+ // recursively process collocated delegates
+ handleColocatedDelegates(ddDelegate.getDelegates().getAnalysisEngineArray() , resourceSpecifierDelegate.getChildren());
+ handleRemoteDelegates(ddDelegate.getDelegates().getRemoteAnalysisEngineArray(), resourceSpecifierDelegate.getChildren());
+ }
+
+ }
+ break; // found a match and completed processing it. We are done with it.
+ }
+ }
+ }
+ }
+
+ private void handleDefaultColocatedDelegates(List<AnalysisEngineComponent> resourceSpecifierDelegates) {
+ // find a matching delegate in the AE resource specifier
+ for (AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates) {
+ if (resourceSpecifierDelegate.isCasMultiplier()) {
+ // plugin cas multiplier settings from dd
+ CasMultiplierNature casMultiplier = new CasMultiplierComponent(false, 1000, 1, true);
+ resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+ }
+ resourceSpecifierDelegate.withScaleout(1).
+ withRequestThreadPoolSize(1).
+ withReplyThreadPoolSize(1).
+ enableAsync();
+
+ if (!resourceSpecifierDelegate.isPrimitive()) {
+ handleDefaultColocatedDelegates(resourceSpecifierDelegate.getChildren());
+ }
+ }
+ }
+ private void handleRemoteDelegates(RemoteAnalysisEngineType[] remoteDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+ if ( Objects.isNull(remoteDelegates) ) {
+ return;
+ }
+ for( RemoteAnalysisEngineType remoteDelegateType : remoteDelegates ) {
+ // find a matching delegate in the AE resource specifier
+ for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) {
+ if ( remoteDelegateType.getKey().equals(resourceSpecifierDelegate.getKey())) {
+ // find an index of the current component in the list. We
+ // will decorate this component as a remote, and replace
+ // it in the list.
+ int index =
+ resourceSpecifierDelegates.indexOf(resourceSpecifierDelegate);
+ // Decorate existing component with remote flavor
+ RemoteAnalysisEngineComponent remoteDelegate =
+ new RemoteAnalysisEngineComponent(resourceSpecifierDelegate, remoteDelegateType);
+
+ //replace component with decorated remote
+ resourceSpecifierDelegates.set(index, remoteDelegate);
+ }
+
+ }
+
+ }
+ }
+/*
+ public void parse(DelegateAnalysisEngineType colocatedDelegate, AnalysisEngineComponent component) {
+ if ( isAggregate(colocatedDelegate) ) {
+ DelegateAnalysisEngineType[] colocatedDelegates =
+ colocatedDelegate.getDelegates().getAnalysisEngineArray();
+ handleColocatedDelegates(colocatedDelegates, component.getChildren());
+ } else {
+
+ }
+ }
+*/
+ public static void main(String[] args) {
+ try {
+ DeploymentDescriptorProcessor ddp =
+ new DeploymentDescriptorProcessor();
+ ddp.newComponent(args[0]);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ private static class TypeConverter {
+ private static int convertStringToInt(String value, int defaultValue) {
+ int returnValue = defaultValue;
+ try {
+ returnValue = Integer.parseInt(value);
+ } catch( Exception e) {
+ }
+ return returnValue;
+ }
+ private static boolean convertStringToBoolean(String value, boolean defaultValue) {
+ boolean returnValue = defaultValue;
+ try {
+ returnValue = Boolean.parseBoolean(value);
+ } catch( Exception e) {
+ }
+ return returnValue;
+ }
+ private static long convertStringToLong(String value, long defaultValue) {
+ long returnValue = defaultValue;
+ try {
+ returnValue = Long.parseLong(value);
+ } catch( Exception e) {
+ }
+ return returnValue;
+ }
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.component.factory;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.PrimitiveAnalysisEngineComponent;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class AnalysisEngineComponentFactory {
+
+ private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) {
+ return (AnalysisEngineDescription) rs;
+ }
+
+ public AnalysisEngineComponent produce(ResourceSpecifier rs, String key) throws Exception {
+ AnalysisEngineDescription aeDescriptor = getAeDescription(rs);
+ AnalysisEngineComponent component = null;
+
+ if ( aeDescriptor.isPrimitive() ) {
+ component = new PrimitiveAnalysisEngineComponent(key, rs);
+ } else {
+ component =
+ new AggregateAnalysisEngineComponent(key, rs);
+ Map<String, ResourceSpecifier> delegates =
+ aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+ for(Entry<String, ResourceSpecifier> delegateEntry: delegates.entrySet() ) {
+ component.add(produce(delegateEntry.getValue(), delegateEntry.getKey() ));
+ }
+ }
+
+ if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed() ) {
+ component.enableScaleout();
+ }
+ if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() ) {
+ component.enableCasMultipler();
+ }
+
+ return component;
+ }
+ public static void main(String[] args ) {
+ try {
+ AnalysisEngineComponentFactory factory =
+ new AnalysisEngineComponentFactory();
+ ResourceSpecifier resourceSpecifier =
+ UimaClassFactory.produceResourceSpecifier(args[0]);
+ factory.produce(resourceSpecifier, "TopLevel");
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Oct 18 14:12:00 2018
@@ -2028,25 +2028,54 @@ implements
return retValue;
}
- private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
-
- // Get the key of the Cas Producer
- String casProducer = cacheEntry.getCasProducerAggregateName();
- // CAS is considered new from the point of view of this service IF it was produced by it
- boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(
- casProducer));
- if (parent != null && parent.isFailed() && isNewCas) {
- return true; // no point to continue if the CAS was produced in this aggregate and its parent
- // failed here
- }
- // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR
- // this component
- // is not a Cas Multiplier
- if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
- return true;
- }
- return false;
- }
+ private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) {
+
+ // Get the key of the Cas Producer
+ String casProducer = cacheEntry.getCasProducerAggregateName();
+ // CAS is considered new from the point of view of this service IF it was
+ // produced by it
+ boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+ // force to drop a child CAS if this service is not a Cas Multiplier
+ if ( isNewCas ) {
+ if ( !isCasMultiplier()) {
+ System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS");
+ return true;
+ }
+ if (parent != null && parent.isFailed()) {
+ return true; // no point to continue if the CAS was produced in this aggregate and its parent
+ // failed
+ }
+ // If the CAS was generated by this component but the Flow Controller wants to
+ // drop the CAS OR this component is not a Cas Multiplier
+ if ( parent != null && parent.getSubordinateCasInPlayCount() == 0
+ && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+ return true;
+ }
+ }
+
+
+ /*
+ if (isNewCas && isTopLevelComponent() && !isCasMultiplier()) {
+ System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS");
+ return true;
+ }
+ if (parent != null && parent.isFailed() && isNewCas) {
+ return true; // no point to continue if the CAS was produced in this aggregate and its parent
+ // failed here
+ }
+ // If the CAS was generated by this component but the Flow Controller wants to
+ // drop the CAS OR
+ // this component
+ // is not a Cas Multiplier
+ if (isNewCas && parent.getSubordinateCasInPlayCount() == 0
+ && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
+ return true;
+ }
+
+ */
+
+ return false;
+ }
private boolean casHasExceptions(CasStateEntry casStateEntry) {
return (casStateEntry.getErrors().size() > 0) ? true : false;
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Thu Oct 18 14:12:00 2018
@@ -40,6 +40,7 @@ import org.apache.uima.aae.jmx.JmxManage
import org.apache.uima.aae.jmx.ServiceErrors;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.spi.transport.UimaMessageListener;
import org.apache.uima.aae.spi.transport.UimaTransport;
@@ -55,6 +56,8 @@ public interface AnalysisEngineControlle
public static final String AEInstanceCount = "AEInstanceCount";
+ public Origin getOrigin();
+
public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException;
public ControllerLatch getControllerLatch();
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Oct 18 14:12:00 2018
@@ -78,6 +78,8 @@ import org.apache.uima.aae.jmx.ServiceEr
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.UimaAsOrigin;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.MonitorBaseImpl;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
@@ -111,6 +113,7 @@ public abstract class BaseAnalysisEngine
JMS,
DIRECT
};
+ private final Origin origin;
private static final Class<?> CLASS_NAME = BaseAnalysisEngineController.class;
private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
@@ -284,7 +287,7 @@ public abstract class BaseAnalysisEngine
protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception;
public BaseAnalysisEngineController() {
-
+ origin = new UimaAsOrigin("");
}
public BaseAnalysisEngineController(AnalysisEngineController aParentController,
@@ -316,7 +319,7 @@ public abstract class BaseAnalysisEngine
Map aDestinationMap, JmxManagement aJmxManagement,boolean disableJCasCache) throws Exception {
System.out.println("C'tor Called Descriptor:"+aDescriptor);
-
+ origin = new UimaAsOrigin(anEndpointName);
casManager = aCasManager;
inProcessCache = anInProcessCache;
localCache = new LocalCache(this);
@@ -529,7 +532,9 @@ public abstract class BaseAnalysisEngine
return uimaContext;
}
-
+ public Origin getOrigin() {
+ return origin;
+ }
public void setThreadFactory(ThreadPoolTaskExecutor factory) {
threadFactory = factory;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Thu Oct 18 14:12:00 2018
@@ -21,6 +21,7 @@ package org.apache.uima.aae.controller;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.jmx.ServiceInfo;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.TypeSystemImpl;
@@ -31,6 +32,12 @@ public interface Endpoint {
public static final int DISABLED = 3;
+ public void setMessageOrigin(Origin origin);
+
+ public Origin getMessageOrigin();
+
+ public String getUniqueId();
+
public boolean isJavaRemote();
public void setJavaRemote();
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Thu Oct 18 14:12:00 2018
@@ -20,11 +20,13 @@
package org.apache.uima.aae.controller;
import java.util.Timer;
+import java.util.UUID;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.TypeSystemImpl;
import org.apache.uima.resource.ResourceSpecifier;
@@ -32,6 +34,8 @@ import org.apache.uima.resource.Resource
public class Endpoint_impl implements Endpoint, Cloneable {
private static final Class<?> CLASS_NAME = Endpoint_impl.class;
+ private String uniqueId = UUID.randomUUID().toString();
+
private volatile boolean javaRemote=false;
private volatile Object destination = null;
@@ -130,6 +134,18 @@ public class Endpoint_impl implements En
private ResourceSpecifier resourceSpecifier;
+ private Origin messageOrigin;
+
+ public void setMessageOrigin(Origin origin) {
+ this.messageOrigin = origin;
+ }
+
+ public Origin getMessageOrigin() {
+ return messageOrigin;
+ }
+ public String getUniqueId() {
+ return uniqueId;
+ }
public void setJavaRemote() {
javaRemote = true;
}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.message.MessageProcessor;
+
+public abstract class AbstractUimaAsConsumer implements UimaAsConsumer{
+
+ protected abstract void setMessageProcessor(MessageProcessor processor);
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,5 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ComponentConnector {
+ public Object getConnectionInfo();
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,31 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.definition.connectors.basic.BasicConnector;
+import org.apache.uima.aae.definition.connectors.basic.DirectConnector;
+
+public class ConnectorFactory {
+
+ public static ComponentConnector newConnector(String protocol, String vendor) {
+ ComponentConnector connector=null;
+ switch(protocol.toLowerCase()) {
+ case "jms":
+ connector = getJmsConnector(vendor);
+ break;
+ case "direct":
+ connector = new DirectConnector();
+ break;
+
+ default:
+ connector = new BasicConnector();
+ }
+ return connector;
+ }
+ private static ComponentConnector getJmsConnector(String vendor) {
+ return null;
+ }
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,7 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ListenerCallback {
+ public void onInitializationError(Exception e);
+ public boolean failedInitialization();
+ public Exception getException();
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,11 @@
+package org.apache.uima.aae.definition.connectors;
+
+import java.util.Map;
+
+public interface UimaAsConnector {
+
+ public UimaAsEndpoint createEndpoint(String uri, Map<String, Object> params)
+ throws Exception;
+
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,17 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsConsumer extends Lifecycle {
+ public enum ConsumerType {GetMeta,ProcessCAS,Cpc,FreeCAS,Reply,Info};
+
+ public void initialize() throws Exception;
+ public void initialize(AnalysisEngineController controller) throws Exception;
+
+ public void consume(DirectMessage message) throws Exception;
+
+ public ConsumerType getType();
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,15 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.message.MessageContext;
+
+public interface UimaAsEndpoint extends Lifecycle {
+ public UimaAsProducer createProducer(String targetUri) throws Exception;
+ public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey) throws Exception;
+ public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception;
+ public void dispatch(MessageContext messageContext) throws Exception;
+ public UimaAsConsumer getConsumer(String targetUri, ConsumerType type);
+ public MessageContext createMessage(int command, int messageType, Endpoint endpoint);
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsMessage;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsProducer extends Lifecycle {
+
+ public void dispatch(DirectMessage message) throws Exception;
+ public void dispatch(DirectMessage message, UimaAsConsumer target) throws Exception;
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,12 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class BasicConnector implements ComponentConnector {
+
+ @Override
+ public Object getConnectionInfo() {
+ return "";
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class DirectConnector implements ComponentConnector {
+
+ @Override
+ public Object getConnectionInfo() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.definition.connectors.jms;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class ActiveMqConnector implements ComponentConnector {
+ private final Object connection;
+
+ ActiveMqConnector(String queue, String broker, int prefetch) {
+ connection = new ActiveMqConnection(queue, broker, prefetch);
+ }
+ @Override
+ public Object getConnectionInfo() {
+ return connection;
+ }
+
+ public class ActiveMqConnection {
+ private final String queueName;
+ private final String brokerUrl;
+ private final int prefetch;
+
+ ActiveMqConnection(String queue, String broker, int prefetch) {
+ this.queueName = queue;
+ this.brokerUrl = broker;
+ this.prefetch = prefetch;
+ }
+ public String getQueueName() {
+ return queueName;
+ }
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+ public int getPrefetch() {
+ return prefetch;
+ }
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+
+public interface MessageProcessor {
+
+ public void process(MessageContext message) throws Exception;
+ public AnalysisEngineController getController();
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,6 @@
+package org.apache.uima.aae.message;
+
+public interface Origin {
+ public String getUniqueId();
+ public String getName();
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
+
+public interface UimaAsMessage {
+ public enum Command {GetMetaRequest, GetMetaResponse, CpcRegeuest, CpcResponse, ProcessRequest, ProcessResponse };
+
+ public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException;
+
+ public Endpoint getEndpoint();
+
+ public String getStringMessage() throws AsynchAEException;
+
+ public Object getObjectMessage() throws AsynchAEException;
+
+ public byte[] getByteMessage() throws AsynchAEException;
+
+ public Object getRawMessage();
+
+ public boolean propertyExists(String aKey) throws AsynchAEException;
+
+ public void setMessageArrivalTime(long anArrivalTime);
+
+ public long getMessageArrivalTime();
+
+ public String getEndpointName();
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java?rev=1844241&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java Thu Oct 18 14:12:00 2018
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.message;
+
+import java.util.UUID;
+
+public class UimaAsOrigin implements Origin {
+
+ private final String uniqueId = UUID.randomUUID().toString();
+ private final String name;
+
+ public UimaAsOrigin(String name) {
+ this.name = name;
+ }
+ @Override
+ public String getUniqueId() {
+ return uniqueId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((uniqueId == null) ? 0 : uniqueId.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ UimaAsOrigin other = (UimaAsOrigin) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (uniqueId == null) {
+ if (other.uniqueId != null)
+ return false;
+ } else if (!uniqueId.equals(other.uniqueId))
+ return false;
+ return true;
+ }
+ @Override
+ public String getName() {
+ return name;
+ }
+ @Override
+ public String toString() {
+ return "Origin[name: " + name + "] [id:"+uniqueId+"]";
+ }
+}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java Thu Oct 18 14:12:00 2018
@@ -20,6 +20,7 @@ package org.apache.uima.aae.service;
import java.util.concurrent.BlockingQueue;
+import org.apache.uima.aae.InProcessCache;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.as.client.DirectMessage;
import org.apache.uima.cas.CAS;
@@ -34,6 +35,7 @@ public interface UimaASService {
public static final int STOP_NOW = 1001;
public String getEndpoint();
+ public int getScaleout();
public String getId();
public void start() throws Exception;
public void stop() throws Exception;
@@ -46,5 +48,5 @@ public interface UimaASService {
public void releaseCAS(String casReferenceId, BlockingQueue<DirectMessage> releaseCASQueue ) throws Exception;
public AnalysisEngineMetaData getMetaData() throws Exception;
public void removeFromCache(String casReferenceId);
-
+ public UimaASService withInProcessCache(InProcessCache cache);
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java Thu Oct 18 14:12:00 2018
@@ -24,7 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -37,9 +38,15 @@ import org.apache.uima.aae.OutputChannel
import org.apache.uima.aae.UimaASUtils;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.ComponentCasPool;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
import org.apache.uima.aae.controller.DelegateEndpoint;
import org.apache.uima.aae.controller.Endpoint;
@@ -51,7 +58,6 @@ import org.apache.uima.aae.error.ErrorHa
import org.apache.uima.aae.error.Threshold;
import org.apache.uima.aae.error.Thresholds;
import org.apache.uima.aae.error.Thresholds.Action;
-import org.apache.uima.aae.error.UimaAsDelegateException;
import org.apache.uima.aae.error.handler.CpcErrorHandler;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
@@ -78,7 +84,6 @@ import org.apache.uima.resource.Resource
import org.apache.uima.resource.ResourceManager;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
-import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionType;
import org.apache.uima.resourceSpecifier.AnalysisEngineType;
import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType;
import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
@@ -109,6 +114,246 @@ public abstract class AbstractUimaAsServ
}
protected abstract void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception;
+
+// public AnalysisEngineController createController( AnalysisEngineComponent component, int howManyInstances) throws Exception {
+ public AnalysisEngineController createController( AnalysisEngineComponent component, ControllerCallbackListener aListener, String serviceId) throws Exception {
+ AnalysisEngineController controller =
+ createController(component, null /*, component.getScaleout() */);
+ controller.setServiceId(serviceId);
+ controller.addControllerCallbackListener(aListener);
+ return controller;
+ }
+
+ /**
+ * Recursively walks through the AE descriptor creating instances of AnalysisEngineController
+ * and linking them in parent-child tree.
+ *
+ * @param d - wrapper around delegate defined in DD (may be null)
+ * @param resourceSpecifier - AE descriptor specifier
+ * @param name - name of the delegate
+ * @param parentController - reference to a parent controller. TopLevel has no parent
+ * @param howManyInstances - scalout for the delegate
+ *
+ * @return
+ * @throws Exception
+ */
+ public AnalysisEngineController createController( AnalysisEngineComponent component, AnalysisEngineController parentController/*, int howManyInstances */) throws Exception {
+
+ AnalysisEngineController controller = null;
+ System.out.println("---------Controller:"+
+ component.getKey()+
+ " resourceSpecifier:"+
+ component.getResourceSpecifier().getClass().getName()+
+ " ResourceCreationSpecifier:"+(component.getResourceSpecifier() instanceof ResourceCreationSpecifier) );
+
+ if ( component.isPrimitive()) {
+ controller = new PrimitiveAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(),casManager, cache, 10, component.getScaleout());
+ } else {
+ // add an endpoint for each delegate in this aggregate. The endpoint Map is required
+ // during initialization of an aggregate controller.
+ Map<String, Endpoint> endpoints = new HashMap<>();
+ AggregateAnalysisEngineComponent aggregate;
+ if ( component instanceof AggregateAnalysisEngineComponent) {
+ aggregate = (AggregateAnalysisEngineComponent)component;
+ } else if ( component instanceof TopLevelServiceComponent) {
+ aggregate = ((TopLevelServiceComponent)component).aggregateComponent();
+ } else {
+ throw new RuntimeException("Expected instance of AggregateAnalysisEngineComponent, instead is instanceof "+component.getClass().getName());
+ }
+// List<AnalysisEngineComponent> delegateComponents = ((AggregateAnalysisEngineComponent)component).getChildren();
+ List<AnalysisEngineComponent> delegateComponents = aggregate.getChildren();
+ for( AnalysisEngineComponent delegateComponent : delegateComponents ) {
+ endpoints.put(delegateComponent.getKey(), delegateComponent.getEndpoint());
+ }
+ controller = new AggregateAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(), casManager, cache, endpoints);
+ addFlowController((AggregateAnalysisEngineController)controller, (AnalysisEngineDescription)component.getResourceSpecifier());
+ // recursively create delegate controllers for all async delegates
+ createDelegateControllers(aggregate, controller);
+ }
+ if ( !controller.isTopLevelComponent() ) {
+ UimaASService service = createUimaASServiceWrapper(controller, component);
+ service.start();
+ }
+
+ return controller;
+ }
+
+
+
+ private void createDelegateControllers(AggregateAnalysisEngineComponent aggregateComponent, AnalysisEngineController controller) throws Exception {
+ for (AnalysisEngineComponent delegateComponent : aggregateComponent.getChildren()) {
+ // if error handling threshold has not been defined for the delegate, add
+ // default thresholds.
+ addDelegateDefaultErrorHandling(controller, delegateComponent.getKey());
+ if (delegateComponent.isRemote()) {
+ Endpoint endpoint = delegateComponent.getEndpoint();
+ if ("java".equals(endpoint.getServerURI()) ) {
+ endpoint.setJavaRemote();
+ }
+
+ } else {
+ if (Objects.isNull(controller.getOutputChannel(ENDPOINT_TYPE.DIRECT))) {
+ OutputChannel oc = new DirectOutputChannel().withController(controller);
+ oc.initialize();
+ controller.addOutputChannel(oc);
+ }
+ if (Objects.isNull(controller.getInputChannel(ENDPOINT_TYPE.DIRECT))) {
+ DirectInputChannel inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY)
+ .withController(controller);
+// 10/11/18 For Direct messaging the message handlers are not needed. Its using command factory
+// inputChannel.setMessageHandler(getMessageHandler(controller));
+ controller.addInputChannel(inputChannel);
+
+ }
+ createController(delegateComponent, controller /*, scaleout */);
+ }
+
+ }
+
+ }
+
+
+ private UimaASService createUimaASServiceWrapper(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception {
+
+ AsynchronousUimaASService service =
+ new AsynchronousUimaASService(controller.getComponentName()).withController(controller);
+ // Need an OutputChannel to dispatch messages from this service
+ OutputChannel outputChannel;
+ if ( ( outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ outputChannel = getOutputChannel(controller);
+ }
+
+ // Need an InputChannel to handle incoming messages
+ InputChannel inputChannel;
+ if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = getInputChannel(controller);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ }
+
+ // add reply queue listener to the parent aggregate controller
+ if ( !controller.isTopLevelComponent() ) {
+ // For every delegate the parent controller needs a reply listener.
+ DirectListener replyListener =
+ addDelegateReplyListener(controller, component);
+ // add process, getMeta, reply queues to an endpoint
+ setDelegateDestinations(controller, service, replyListener);
+ }
+ DirectListener processListener =
+ createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getProcessRequestQueue(),Type.ProcessCAS);
+ inputChannel.registerListener(processListener);
+
+ DirectListener getMetaListener =
+ createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getMetaRequestQueue(),Type.GetMeta);
+ inputChannel.registerListener(getMetaListener);
+ if (controller.isCasMultiplier()) {
+ DirectListener freCASChannelListener =
+ createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getFreeCasQueue(),Type.FreeCAS);
+ inputChannel.registerListener(freCASChannelListener);
+ ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+ }
+
+ /*
+ DirectListener processListener = new DirectListener(Type.ProcessCAS).
+ withController(controller).
+ withConsumerThreads(component.getScaleout()).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getProcessRequestQueue()).
+ initialize();
+ inputChannel.registerListener(processListener);
+
+ DirectListener getMetaListener = new DirectListener(Type.GetMeta).
+ withController(controller).
+ withConsumerThreads(getReplyScaleout(d)).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getMetaRequestQueue()).initialize();
+ inputChannel.registerListener(getMetaListener);
+
+ if (controller.isCasMultiplier()) {
+ DirectListener freCASChannelListener =
+ new DirectListener(Type.FreeCAS).
+ withController(controller).
+ withConsumerThreads(component.getScaleout()).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getFreeCasQueue()).
+ initialize();
+ inputChannel.registerListener(freCASChannelListener);
+ ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+ }
+ */
+ return service;
+ }
+ private DirectListener createDirectListener(AnalysisEngineController controller, int scaleout, DirectInputChannel inputChannel, BlockingQueue<DirectMessage> q, Type type) throws Exception{
+// DirectListener listener = new DirectListener(type).
+ return new DirectListener(type).
+ withController(controller).
+ withConsumerThreads(scaleout).
+ withInputChannel(inputChannel).
+ withQueue(q).initialize();
+// inputChannel.registerListener(listener);
+// return listener;
+ }
+ private DirectListener addDelegateReplyListener(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception {
+ DirectInputChannel parentInputChannel;
+ // create parent controller's input channel if necessary
+ if ((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ // create delegate
+ parentInputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(controller.getParentController());
+ Handler messageHandlerChain = getMessageHandler(controller.getParentController());
+ parentInputChannel.setMessageHandler(messageHandlerChain);
+ controller.getParentController().addInputChannel(parentInputChannel);
+ } else {
+ parentInputChannel = (DirectInputChannel) controller.
+ getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ int replyScaleout = 1;
+ if ( component instanceof RemoteAnalysisEngineComponent) {
+ ((RemoteAnalysisEngineComponent)component).getReplyConsumerCount();
+ }
+
+ // USE FACTORY HERE. CHANGE DirectListener to interface
+ // DirectListner replyListener = DirectListenerFactory.newReplyListener();
+ DirectListener replyListener = new DirectListener(Type.Reply).
+ withController(controller.getParentController()).
+ withConsumerThreads(replyScaleout).
+ withInputChannel(parentInputChannel).
+ withQueue(new LinkedBlockingQueue<DirectMessage>()).
+ withName(controller.getKey()).
+ initialize();
+ parentInputChannel.registerListener(replyListener);
+
+ return replyListener;
+ }
+ protected void initialize(UimaASService service, ComponentCasPool cp, Transport transport) {
+
+ resourceManager = UimaClassFactory.produceResourceManager();
+ casManager = new AsynchAECasManager_impl(resourceManager);
+ casManager.setCasPoolSize(cp.getPoolSize());
+ casManager.setDisableJCasCache(cp.isDisableJCasCache());
+ casManager.setInitialFsHeapSize(cp.getInitialHeapSize());
+
+ if ( transport.equals(Transport.JMS)) {
+ cache = new InProcessCache();
+ } else if ( transport.equals(Transport.Java)) {
+
+ // ?????????????????????????? is this test necessary?
+ if ( (cache = (InProcessCache)System.getProperties().get("InProcessCache")) == null) {
+ cache = new InProcessCache();
+ System.getProperties().put("InProcessCache", cache);
+ }
+
+ }
+// if ( cache == null ) {
+// cache = new InProcessCache();
+// }
+ }
+
+
+ /*
+ * OLD CODE *****************************
+ */
public AsyncPrimitiveErrorConfigurationType addDefaultErrorHandling(ServiceType s) {
AsyncPrimitiveErrorConfigurationType pec;
@@ -196,13 +441,16 @@ public abstract class AbstractUimaAsServ
return null;
}
private void addDelegateDefaultErrorHandling(AnalysisEngineController controller, String delegatKey) {
- ErrorHandlerChain erc = controller.getErrorHandlerChain();
- for( ErrorHandler eh : erc ) {
- if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
- // add default error handling
- eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
- }
- }
+ if ( Objects.nonNull(controller.getErrorHandlerChain()) ) {
+ ErrorHandlerChain erc = controller.getErrorHandlerChain();
+ for( ErrorHandler eh : erc ) {
+ if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
+ // add default error handling
+ eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
+ }
+ }
+ }
+
}
private OutputChannel getOutputChannel(AnalysisEngineController controller ) throws Exception {
OutputChannel outputChannel = null;
@@ -1244,7 +1492,7 @@ public abstract class AbstractUimaAsServ
private boolean isAggregate(AnalysisEngineType aet) {
// Is this an aggregate? An aggregate has a property async=true or has delegates.
System.out.println("......"+aet.getKey()+" aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" aet.isSetDelegates()="+aet.isSetDelegates() );
-
+ Objects.requireNonNull(aet, "AnalysisEngineType must be non-null");
if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) {
return true;
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java Thu Oct 18 14:12:00 2018
@@ -24,13 +24,12 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import javax.management.ServiceNotFoundException;
-
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.InputChannel.ChannelType;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -46,7 +45,6 @@ import org.apache.uima.aae.error.handler
import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
import org.apache.uima.aae.handler.Handler;
import org.apache.uima.aae.service.AsynchronousUimaASService;
-import org.apache.uima.aae.service.ServiceRegistry;
import org.apache.uima.aae.service.UimaASService;
import org.apache.uima.aae.service.UimaAsServiceRegistry;
import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
@@ -61,8 +59,6 @@ import org.apache.uima.resource.Resource
import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
import org.apache.uima.resourceSpecifier.CasPoolType;
-import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
-import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
import org.apache.uima.resourceSpecifier.ServiceType;
public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder {
@@ -92,6 +88,144 @@ public class UimaAsDirectServiceBuilder
}
}
+
+ public UimaASService build(TopLevelServiceComponent topLevelComponent, ControllerCallbackListener callback)
+ throws Exception {
+ AsynchronousUimaASService service = null;
+
+ // is this the only one resource specifier type supported by the current uima-as?
+ if (topLevelComponent.getResourceSpecifier() instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor =
+ (AnalysisEngineDescription) topLevelComponent.getResourceSpecifier();
+ String endpoint = resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint());
+ // Create a Top Level Service (TLS) wrapper. This wrapper may contain
+ // references to multiple TLS service instances if the TLS is scaled
+ // up.
+ service = new AsynchronousUimaASService(endpoint)
+ .withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+ .withResourceSpecifier(topLevelComponent.getResourceSpecifier())
+ .withScaleout(topLevelComponent.getScaleout());
+
+ this.buildAndDeploy(topLevelComponent, service, callback);
+
+
+ }
+ return service;
+ }
+ public UimaASService buildAndDeploy(TopLevelServiceComponent topLevelComponent, AsynchronousUimaASService service, ControllerCallbackListener callback) throws Exception {
+ // create ResourceManager, CasManager, and InProcessCache
+ initialize(service, topLevelComponent.getComponentCasPool(), Transport.Java);
+
+ AnalysisEngineController topLevelController =
+ createController(topLevelComponent, callback, service.getId());
+
+ //topLevelController.addControllerCallbackListener(callback);
+
+ //topLevelController.setServiceId(service.getId());
+
+ service.withInProcessCache(super.cache);
+ System.setProperty("BrokerURI", "Direct");
+ configureTopLevelService(topLevelController, service);//, topLevelComponent.getScaleout());
+ return service;
+
+ }
+
+ private DirectOutputChannel outputChannel(AnalysisEngineController topLevelController) throws Exception {
+ DirectOutputChannel outputChannel = null;
+ if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+ outputChannel = new DirectOutputChannel().withController(topLevelController);
+ topLevelController.addOutputChannel(outputChannel);
+ } else {
+ outputChannel = (DirectOutputChannel) topLevelController.
+ getOutputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ return outputChannel;
+ }
+ private DirectInputChannel inputChannel(AnalysisEngineController topLevelController) throws Exception {
+ DirectInputChannel inputChannel;
+ if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(topLevelController);
+ Handler messageHandlerChain = getMessageHandler(topLevelController);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ topLevelController.addInputChannel(inputChannel);
+ } else {
+ inputChannel = (DirectInputChannel) topLevelController.
+ getInputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ return inputChannel;
+ }
+/*
+ private void configureTopLevelService(AnalysisEngineController topLevelController,
+ AsynchronousUimaASService service, int howMany) throws Exception {
+*/
+ private void configureTopLevelService(AnalysisEngineController topLevelController,
+ AsynchronousUimaASService service) throws Exception {
+
+ //addErrorHandling(topLevelController, pec);
+
+
+ // create a single instance of OutputChannel for Direct communication if
+ // necessary
+ DirectOutputChannel outputChannel = outputChannel(topLevelController);
+
+ DirectInputChannel inputChannel = inputChannel(topLevelController);
+
+ if ( topLevelController instanceof AggregateAnalysisEngineController ) {
+ ((AggregateAnalysisEngineController_impl)topLevelController).
+ setServiceEndpointName(service.getEndpoint());
+ }
+ BlockingQueue<DirectMessage> pQ = null;
+ BlockingQueue<DirectMessage> mQ = null;
+
+ // Lookup queue name in service registry. If this queue exists, the new service
+ // being
+ // created here will share the same queue to balance the load.
+ UimaASService s;
+ try {
+ s = UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint());
+ if ( s instanceof AsynchronousUimaASService) {
+ pQ = ((AsynchronousUimaASService) s).getProcessRequestQueue();
+ mQ = ((AsynchronousUimaASService) s).getMetaRequestQueue();
+ }
+
+ } catch( Exception ee) {
+ pQ = service.getProcessRequestQueue();
+ mQ = service.getMetaRequestQueue();
+ }
+
+ scaleout = service.getScaleout();
+ DirectListener processListener = new DirectListener(Type.ProcessCAS).withController(topLevelController)
+ .withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ).
+ initialize();
+
+ DirectListener getMetaListener = new DirectListener(Type.GetMeta).withController(topLevelController)
+ .withConsumerThreads(1).withInputChannel(inputChannel).
+ withQueue(mQ).initialize();
+
+ addFreeCASListener(service, topLevelController, inputChannel, outputChannel, scaleout );
+
+ inputChannel.registerListener(getMetaListener);
+ inputChannel.registerListener(processListener);
+
+ service.withController(topLevelController);
+
+ }
+
+
+
+
+
+
+
+
+
+
+ /*
+ * OLD CODE **********************************************************************************
+ */
+
+
public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
throws Exception {
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java Thu Oct 18 14:12:00 2018
@@ -47,27 +47,30 @@ import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
import org.apache.uima.cas.impl.Serialization;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;
public abstract class AbstractUimaAsCommand implements UimaAsCommand {
protected AnalysisEngineController controller;
private Object mux = new Object();
-
- protected AbstractUimaAsCommand(AnalysisEngineController controller) {
+ private final MessageContext messageContext;
+
+ protected AbstractUimaAsCommand(AnalysisEngineController controller, MessageContext aMessageContext) {
this.controller = controller;
+ this.messageContext = aMessageContext;
}
- protected String getCasReferenceId(Class<?> concreteClassName, MessageContext aMessageContext) throws AsynchAEException {
- if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
+ protected String getCasReferenceId(Class<?> concreteClassName/*, MessageContext aMessageContext */) throws AsynchAEException {
+ if (!messageContext.propertyExists(AsynchAEMessage.CasReference)) {
if (UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, concreteClassName.getName(),
"getCasReferenceId", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_message_has_cas_refid__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ new Object[] { messageContext.getEndpoint().getEndpoint() });
}
return null;
}
- return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ return messageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
}
protected CacheEntry getCacheEntryForCas(String casReferenceId) {
@@ -95,7 +98,7 @@ public abstract class AbstractUimaAsComm
return (controller.isTopLevelComponent() && controller instanceof AggregateAnalysisEngineController);
}
- protected void handleError(Exception e, CacheEntry cacheEntry, MessageContext mc) {
+ protected void handleError(Exception e, CacheEntry cacheEntry/*, MessageContext mc */) {
if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
@@ -105,7 +108,7 @@ public abstract class AbstractUimaAsComm
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
}
ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, messageContext.getEndpoint());
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
controller.dropCAS(cacheEntry.getCas());
@@ -154,31 +157,69 @@ public abstract class AbstractUimaAsComm
}
- protected static ErrorContext populateErrorContext(MessageContext aMessageCtx) {
+ protected ErrorContext populateErrorContext(/*MessageContext aMessageCtx */) {
ErrorContext errorContext = new ErrorContext();
- if (aMessageCtx != null) {
+ if (messageContext != null) {
try {
- if (aMessageCtx.propertyExists(AsynchAEMessage.Command)) {
+ if (messageContext.propertyExists(AsynchAEMessage.Command)) {
errorContext.add(AsynchAEMessage.Command,
- aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command));
+ messageContext.getMessageIntProperty(AsynchAEMessage.Command));
}
- if (aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) {
+ if (messageContext.propertyExists(AsynchAEMessage.MessageType)) {
errorContext.add(AsynchAEMessage.MessageType,
- aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType));
+ messageContext.getMessageIntProperty(AsynchAEMessage.MessageType));
}
- if (aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) {
+ if (messageContext.propertyExists(AsynchAEMessage.CasReference)) {
errorContext.add(AsynchAEMessage.CasReference,
- aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference));
+ messageContext.getMessageStringProperty(AsynchAEMessage.CasReference));
}
- errorContext.add(UIMAMessage.RawMsg, aMessageCtx.getRawMessage());
+ errorContext.add(UIMAMessage.RawMsg, messageContext.getRawMessage());
} catch (Exception e) { /* ignore */
}
}
return errorContext;
}
-
+ protected Endpoint getEndpoint() {
+ return messageContext.getEndpoint();
+ }
+ protected int getMessageIntProperty(String propertyName) throws Exception {
+ return messageContext.getMessageIntProperty(propertyName);
+ }
+ protected String getMessageStringProperty(String propertyName) throws Exception {
+ return messageContext.getMessageStringProperty(propertyName);
+ }
+ protected ResourceMetaData getResourceMetaData() throws Exception {
+ return (ResourceMetaData)messageContext.getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+ }
+ protected String getStringMessage() throws Exception {
+ return messageContext.getStringMessage();
+ }
+ protected Object getMessageObjectProperty(String propertyName) throws Exception {
+ return messageContext.getMessageObjectProperty(propertyName);
+ }
+ protected boolean getMessageBooleanProperty(String propertyName) throws Exception {
+ return messageContext.getMessageBooleanProperty(propertyName);
+ }
+ protected long getMessageLongProperty( String propertyName) throws Exception {
+ return messageContext.getMessageLongProperty(propertyName);
+ }
+ protected boolean propertyExists(String propertyName) throws Exception {
+ return messageContext.propertyExists(propertyName);
+ }
+ protected String getEndpointName() {
+ return messageContext.getEndpointName();
+ }
+ protected Object getObjectMessage() throws Exception {
+ return messageContext.getObjectMessage();
+ }
+ protected byte[] getByteMessage() throws Exception {
+ return messageContext.getByteMessage();
+ }
+ protected MessageContext getMessageContext() {
+ return messageContext;
+ }
protected Endpoint fetchParentCasOrigin(String parentCasId) throws AsynchAEException {
Endpoint endpoint = null;
String parentId = parentCasId;
@@ -224,8 +265,8 @@ public abstract class AbstractUimaAsComm
return cas;
}
- protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint,
- MessageContext mc) throws Exception {
+ protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint
+ /*MessageContext mc*/) throws Exception {
SerializationResult result = new SerializationResult();
// Aggregate time spent waiting for a CAS in the shadow cas pool
@@ -250,17 +291,17 @@ public abstract class AbstractUimaAsComm
// Create deserialized wrapper for XMI, BINARY, COMPRESSED formats. To add
// a new serialization format add a new class which implements
// UimaASDeserializer and modify DeserializerFactory class.
- UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+ UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext);
deserializer.deserialize(result);
return result;
}
- protected SerializationResult deserializeInputCAS( MessageContext mc)
+ protected SerializationResult deserializeInputCAS()
throws Exception {
SerializationResult result = new SerializationResult();
- String origin = mc.getEndpoint().getEndpoint();
- Endpoint endpoint = mc.getEndpoint();
+ String origin = messageContext.getEndpoint().getEndpoint();
+ Endpoint endpoint = messageContext.getEndpoint();
// Time how long we wait on Cas Pool to fetch a new CAS
long t1 = controller.getCpuTime();
@@ -276,20 +317,20 @@ public abstract class AbstractUimaAsComm
return null;
}
- UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+ UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext);
deserializer.deserialize(result);
return result;
}
- protected Delegate getDelegate(MessageContext mc) throws AsynchAEException {
+ protected Delegate getDelegate(/* MessageContext mc */) throws AsynchAEException {
String delegateKey = null;
- if (mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length() == 0) {
- String fromEndpoint = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ if (messageContext.getEndpoint().getEndpoint() == null || messageContext.getEndpoint().getEndpoint().trim().length() == 0) {
+ String fromEndpoint = messageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
delegateKey = ((AggregateAnalysisEngineController) controller)
.lookUpDelegateKey(fromEndpoint);
} else {
delegateKey = ((AggregateAnalysisEngineController) controller)
- .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+ .lookUpDelegateKey(messageContext.getEndpoint().getEndpoint());
}
return ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateKey);
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java Thu Oct 18 14:12:00 2018
@@ -23,14 +23,14 @@ import org.apache.uima.aae.controller.En
import org.apache.uima.aae.message.MessageContext;
public class CollectionProcessCompleteRequestCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public CollectionProcessCompleteRequestCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller, mc);
+// this.mc = mc;
}
public void execute() throws Exception {
- Endpoint endpoint = mc.getEndpoint();
- controller.collectionProcessComplete(endpoint);
+// Endpoint endpoint = mc.getEndpoint();
+ controller.collectionProcessComplete(super.getEndpoint());
}
}
Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java (original)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java Thu Oct 18 14:12:00 2018
@@ -26,14 +26,14 @@ import org.apache.uima.aae.message.Async
import org.apache.uima.aae.message.MessageContext;
public class CollectionProcessCompleteResponseCommand extends AbstractUimaAsCommand {
- private MessageContext mc;
+// private MessageContext mc;
public CollectionProcessCompleteResponseCommand(MessageContext mc, AnalysisEngineController controller) {
- super(controller);
- this.mc = mc;
+ super(controller,mc);
+// this.mc = mc;
}
public void execute() throws Exception {
- Delegate delegate = super.getDelegate(mc);
+ Delegate delegate = super.getDelegate();
try {
System.out.println("..... Controller:"+controller.getComponentName()+" Handling CPC From "+delegate.getKey());
((AggregateAnalysisEngineController)controller)
@@ -41,7 +41,7 @@ public class CollectionProcessCompleteRe
} catch (Exception e) {
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
- errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
controller.getErrorHandlerChain().handle(e, errorContext, controller);
}
}