You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [8/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.builder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.uima.aae.AsynchAECasManager_impl;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaASUtils;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.DelegateEndpoint;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
+import org.apache.uima.aae.error.ErrorHandler;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.Threshold;
+import org.apache.uima.aae.error.Thresholds;
+import org.apache.uima.aae.error.UimaAsDelegateException;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
+import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.handler.input.MetadataRequestHandler_impl;
+import org.apache.uima.aae.handler.input.MetadataResponseHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.handler.input.ProcessResponseHandler;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.delegate.AggregateAnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.CasMultiplierNature;
+import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.analysis_engine.metadata.FlowConstraints;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.DirectListener;
+import org.apache.uima.as.client.DirectMessage;
+import org.apache.uima.as.client.DirectOutputChannel;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.cas.SerialFormat;
+import org.apache.uima.resource.ResourceCreationSpecifier;
+import org.apache.uima.resource.ResourceManager;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasMultiplierType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegatesType;
+import org.apache.uima.resourceSpecifier.EnvironmentVariableType;
+import org.apache.uima.resourceSpecifier.EnvironmentVariablesType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.uima.resourceSpecifier.TopLevelAnalysisEngineType;
+import org.apache.xmlbeans.XmlDocumentProperties;
+
+public abstract class AbstractUimaAsServiceBuilder implements ServiceBuilder {
+ protected InProcessCache cache;
+ protected AsynchAECasManager_impl casManager;
+ protected ResourceManager resourceManager;
+ private static final String NoParent= "NoParent";
+ private enum FlowControllerType {
+ FIXED
+ }
+
+ protected abstract void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception;
+
+ public AsyncPrimitiveErrorConfigurationType addDefaultErrorHandling(ServiceType s) {
+ AsyncPrimitiveErrorConfigurationType pec;
+ if ( s.getAnalysisEngine() == null ) {
+ s.addNewAnalysisEngine();
+ }
+ pec = s.getAnalysisEngine().addNewAsyncPrimitiveErrorConfiguration();
+ // Create default error handling for process CAS requests
+ // By default, any error will result in process termination
+ ProcessCasErrorsType pcet = pec.addNewProcessCasErrors();
+ pcet.setContinueOnRetryFailure("false");
+ pcet.setMaxRetries(0);
+ pcet.setThresholdAction("terminate");
+ pcet.setThresholdCount(0);
+ pcet.setThresholdWindow(0);
+ pcet.setTimeout(0); // no timeout
+
+ CollectionProcessCompleteErrorsType cpcet = pec.addNewCollectionProcessCompleteErrors();
+ cpcet.setTimeout(0);
+
+ return pec;
+ }
+
+ /**
+ * A DD may optionally specify env vars for this process. If specified in DD, add
+ * all env vars to this process environment.
+ *
+ * @param dd - deployment descriptor object
+ */
+ public void addEnvironmentVariablesFromDD(AnalysisEngineDeploymentDescriptionDocument dd) {
+
+ ServiceType st = getService(dd);
+ if ( st != null && st.getEnvironmentVariables() != null ) {
+ EnvironmentVariablesType vars =
+ st.getEnvironmentVariables();
+ for( EnvironmentVariableType envVar : vars.getEnvironmentVariableArray() ) {
+ System.getenv().put(envVar.getName(), envVar.getStringValue());
+ }
+ }
+ }
+ /**
+ * Add JmsInputChannel to handle replies from remote delegates
+ *
+ * @param controller
+ * @param type
+ * @throws Exception
+ */
+ private void addInputChannelForRemoteReplies(AnalysisEngineController controller, ChannelType type) throws Exception {
+/*
+
+ DISABLE THIS FOR NOW. ENABLE WHEN READY TO TEST AGGRREGATE WITH
+ REMOTE DELEGATE
+
+
+ // This needs to be JmsInputChannel since it will process remote replies which are JMS based
+ InputChannel inputChannel = UimaAsJmsServiceBuilder.createInputChannel(type);
+ // setup bi-directional link between controller and an input channel
+ inputChannel.setController(controller);
+ controller.addInputChannel(inputChannel);
+
+ inputChannel.setEndpointName("");
+ // add message handlers. These will handle GetMeta, Process, CPC replies
+ inputChannel.setMessageHandler(getMessageHandler(controller));
+*/
+
+ }
+ /**
+ * Check if a given delegate is defined in a DD. This is optional in a DD, but
+ * if present there is a configuration which may override default scalout, error
+ * handling (remotes only), and if the delegate is remote.
+ *
+ * @param targetDelegateKey - key of the delegate to find
+ * @param ddAggregate - DD aggregate instance
+ * @return
+ */
+ private AnalysisEngineDelegate getMatchingDelegateFromDD(String targetDelegateKey, AggregateAnalysisEngineDelegate ddAggregate) {
+ if ( ddAggregate.getDelegates() != null ) {
+ for( AnalysisEngineDelegate delegate : ddAggregate.getDelegates() ) {
+ if ( targetDelegateKey.equals(delegate.getKey())) {
+ return delegate;
+ }
+ }
+ }
+
+ return null;
+ }
+ private void addDelegateDefaultErrorHandling(AnalysisEngineController controller, String delegatKey) {
+ ErrorHandlerChain erc = controller.getErrorHandlerChain();
+ for( ErrorHandler eh : erc ) {
+ if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
+ // add default error handling
+ eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold());
+ }
+ }
+ }
+ private OutputChannel getOutputChannel(AnalysisEngineController controller ) throws Exception {
+ OutputChannel outputChannel = null;
+// if (controller.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+ outputChannel = UimaAsDirectServiceBuilder.createOutputChannel(controller);
+ controller.addOutputChannel(outputChannel);
+// } else {
+// outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT);
+// }
+
+ return outputChannel;
+ }
+ private InputChannel getInputChannel(AnalysisEngineController controller) throws Exception {
+ InputChannel inputChannel;
+// if ((controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = UimaAsDirectServiceBuilder.createInputChannel(ChannelType.REQUEST_REPLY);
+ inputChannel.setController(controller);
+
+// } else {
+// inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT);
+// }
+ return inputChannel;
+ }
+ private int getScaleout(AnalysisEngineDelegate d) {
+ int scaleout = 1; // default
+ if ( d != null && d.getScaleout() > 1 ) {
+ // fetch scaleout from the DD spec for this delegate
+ scaleout = d.getScaleout();
+ }
+ return scaleout;
+ }
+ private int getReplyScaleout(AnalysisEngineDelegate d) {
+ int scaleout = 1; // default
+ if ( d != null && d.getReplyScaleout() > 1 ) {
+ // fetch scaleout from the DD spec for this delegate
+ scaleout = d.getReplyScaleout();
+ }
+ return scaleout;
+ }
+ private void setDelegateDestinations(AnalysisEngineController controller, AsynchronousUimaASService service, DirectListener replyListener) {
+ Map<String,Endpoint> aggregateDelegateEndpoints =
+ ((AggregateAnalysisEngineController_impl)controller.getParentController()).getDestinations();
+ Endpoint endpoint = aggregateDelegateEndpoints.get(controller.getKey());
+ endpoint.setReplyDestination(replyListener.getEndpoint());
+ endpoint.setGetMetaDestination(service.getMetaRequestQueue());
+ endpoint.setDestination(service.getProcessRequestQueue());
+
+ }
+ private DirectListener addDelegateReplyListener(AnalysisEngineController controller, AnalysisEngineDelegate d) throws Exception {
+ DirectInputChannel parentInputChannel;
+ // create parent controller's input channel if necessary
+ if ((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ // create delegate
+ parentInputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(controller.getParentController());
+ Handler messageHandlerChain = getMessageHandler(controller.getParentController());
+ parentInputChannel.setMessageHandler(messageHandlerChain);
+ controller.getParentController().addInputChannel(parentInputChannel);
+ } else {
+ parentInputChannel = (DirectInputChannel) controller.
+ getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+ int replyScaleout = 1;
+ if ( d != null && d.getReplyScaleout() > 1) {
+ replyScaleout = d.getReplyScaleout();
+ }
+
+ // USE FACTORY HERE. CHANGE DirectListener to interface
+ // DirectListner replyListener = DirectListenerFactory.newReplyListener();
+ DirectListener replyListener = new DirectListener(Type.Reply).
+ withController(controller.getParentController()).
+ withConsumerThreads(replyScaleout).
+ withInputChannel(parentInputChannel).
+ withQueue(new LinkedBlockingQueue<DirectMessage>()).
+ withName(controller.getKey()).
+ initialize();
+ parentInputChannel.registerListener(replyListener);
+ return replyListener;
+ }
+
+ private UimaASService createUimaASServiceWrapper(AnalysisEngineController controller, AnalysisEngineDelegate d) throws Exception {
+
+ AsynchronousUimaASService service =
+ new AsynchronousUimaASService(controller.getComponentName()).withController(controller);
+ // Need an OutputChannel to dispatch messages from this service
+ OutputChannel outputChannel;
+ if ( ( outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ outputChannel = getOutputChannel(controller);
+ }
+
+ // Need an InputChannel to handle incoming messages
+ InputChannel inputChannel;
+ if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = getInputChannel(controller);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ }
+
+ // add reply queue listener to the parent aggregate controller
+ if ( !controller.isTopLevelComponent() ) {
+ // For every delegate the parent controller needs a reply listener.
+ DirectListener replyListener =
+ addDelegateReplyListener(controller, d);
+ // add process, getMeta, reply queues to an endpoint
+ setDelegateDestinations(controller, service, replyListener);
+ }
+
+ DirectListener processListener = new DirectListener(Type.ProcessCAS).
+ withController(controller).
+ withConsumerThreads(getScaleout(d)).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getProcessRequestQueue()).
+ initialize();
+ inputChannel.registerListener(processListener);
+
+ DirectListener getMetaListener = new DirectListener(Type.GetMeta).
+ withController(controller).
+ withConsumerThreads(getReplyScaleout(d)).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getMetaRequestQueue()).initialize();
+ inputChannel.registerListener(getMetaListener);
+
+ if (controller.isCasMultiplier()) {
+ DirectListener freCASChannelListener =
+ new DirectListener(Type.FreeCAS).
+ withController(controller).
+ withConsumerThreads(getScaleout(d)).
+ withInputChannel((DirectInputChannel)inputChannel).
+ withQueue(service.getFreeCasQueue()).
+ initialize();
+ inputChannel.registerListener(freCASChannelListener);
+ ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+ }
+
+ return service;
+ }
+ private boolean isAggregate(AnalysisEngineDelegate d, ResourceSpecifier resourceSpecifier) {
+ boolean aggregate = false;
+ if (resourceSpecifier instanceof AnalysisEngineDescription ) {
+ AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+ if ( d != null ) {
+ if ((d instanceof AggregateAnalysisEngineDelegate) ||
+ (d.isAsync() && !d.isPrimitive()) ) {
+ aggregate = true;
+ }
+ } else if ( !aeDescriptor.isPrimitive() ) {
+ aggregate = true;
+ }
+ }
+ return aggregate;
+ }
+
+ private void createDelegateControllers( AnalysisEngineDelegate d, Map<String, ResourceSpecifier> delegates, AnalysisEngineController controller) throws Exception {
+ for (Map.Entry<String, ResourceSpecifier> delegate : delegates.entrySet()) {
+ // if error handling threshold has not been defined for the delegate, add
+ // default thresholds.
+ addDelegateDefaultErrorHandling(controller, delegate.getKey());
+ AnalysisEngineDelegate aed = null;
+ int scaleout = 1;
+ // fetch delegate configuration from a DD if it exists. The DD configuration
+ // is optional but may exist to support default overrides for scaleout,
+ // error handling(remotes only), and connectivity to a remote (broker,queue)
+ if ( d != null ) {
+ aed = getMatchingDelegateFromDD(delegate.getKey(), (AggregateAnalysisEngineDelegate)d);
+ }
+ // check if DD configuration exists and this delegate has been configured as a remote
+ if ( aed != null && aed.isRemote() ) {
+ Endpoint_impl endpoint =
+ (Endpoint_impl)((AggregateAnalysisEngineController)controller).getDestinations().get(delegate.getKey());
+ if ( endpoint.getServerURI().equals("java")) {
+ endpoint.setJavaRemote();
+ }
+ configureRemoteDelegate((RemoteAnalysisEngineDelegate)aed, (AggregateAnalysisEngineController)controller, endpoint);
+ } else {
+ // Not a remote, but it still may have DD configuration to
+ if ( aed != null ) {
+ scaleout = aed.getScaleout();
+ }
+ if ( controller.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+ OutputChannel oc = new DirectOutputChannel().withController(controller);
+ oc.initialize();
+ controller.addOutputChannel(oc);
+ }
+ if ( (controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null ) {
+ DirectInputChannel inputChannel =
+ new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(controller);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ }
+ ResourceSpecifier delegateResourceSpecifier = UimaClassFactory.produceResourceSpecifier(delegate.getValue().getSourceUrlString());
+ createController( aed, delegateResourceSpecifier, delegate.getKey(), controller, scaleout);
+ }
+ }
+
+ }
+ /**
+ * Recursively walks through the AE descriptor creating instances of AnalysisEngineController
+ * and linking them in parent-child tree.
+ *
+ * @param d - wrapper around delegate defined in DD (may be null)
+ * @param resourceSpecifier - AE descriptor specifier
+ * @param name - name of the delegate
+ * @param parentController - reference to a parent controller. TopLevel has no parent
+ * @param howManyInstances - scalout for the delegate
+ *
+ * @return
+ * @throws Exception
+ */
+ public AnalysisEngineController createController( AnalysisEngineDelegate d, ResourceSpecifier resourceSpecifier, String name, AnalysisEngineController parentController, int howManyInstances) throws Exception {
+ AnalysisEngineController controller = null;
+ System.out.println("---------Controller:"+name+" resourceSpecifier:"+resourceSpecifier.getClass().getName()+" ResourceCreationSpecifier:"+(resourceSpecifier instanceof ResourceCreationSpecifier) );
+
+ if ( isAggregate(d, resourceSpecifier)) {
+ AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+ // the following should be removed at some point. Its just
+ // catching unexpected behaviour which should not happen
+ // after the code is debugged
+ if ( !(d instanceof AggregateAnalysisEngineDelegate) ) {
+ System.out.println("............ what?!");
+ Thread.dumpStack();
+ }
+ AggregateAnalysisEngineDelegate aggregateDeleagte = null;
+
+ if ( d != null ) {
+ aggregateDeleagte = (AggregateAnalysisEngineDelegate)d;
+ }
+ // add an endpoint for each delegate in this aggregate. The endpoint Map is required
+ // during initialization of an aggregate controller.
+ Map<String, Endpoint> endpoints = getDelegateEndpoints( aeDescriptor, aggregateDeleagte );
+
+ controller = new AggregateAnalysisEngineController_impl(parentController, name, resourceSpecifier.getSourceUrlString(), casManager, cache, endpoints);
+
+ if ( aggregateDeleagte != null ) {
+ controller.setErrorHandlerChain(aggregateDeleagte.getDelegateErrorHandlerChain());
+ }
+ addFlowController((AggregateAnalysisEngineController)controller, aeDescriptor);
+
+ Map<String, ResourceSpecifier> delegates = aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+
+ createDelegateControllers(aggregateDeleagte, delegates, controller);
+ } else {
+ controller = new PrimitiveAnalysisEngineController_impl(parentController, name, resourceSpecifier.getSourceUrlString(),casManager, cache, 10, howManyInstances);
+ }
+ if ( !controller.isTopLevelComponent() ) {
+ UimaASService service = createUimaASServiceWrapper(controller, d);
+ service.start();
+ }
+
+ return controller;
+ }
+ private void configureRemoteDelegate( RemoteAnalysisEngineDelegate remote,AggregateAnalysisEngineController aggregateController, Endpoint_impl endpoint) throws Exception {
+ // check if this remote is actually deployed in the same process (collocated).
+ // If is not local, then it must be a JMS service.
+ if ( remote.isCollocated() ) {
+ //
+ // addListenerForReplyHandling(aggregateController, endpoint, remote );
+
+ } else {
+ // Remote delegate can only be reached via JMS. To communicate with such delegate
+ // the aggregate needs JMS based listener, JmsInputChannel and JmsOutputChannel.
+ //String brokerURL = ((RemoteAnalysisEngineType)o).getInputQueue().getBrokerURL();
+ //int prefetch = ((RemoteAnalysisEngineType)o).getInputQueue().getPrefetch();
+ //endpoint.setRemote(true);
+// addJmsListenerForReplyHandling(aggregateController, endpoint, remote );
+ // Single instance of JMS input channel is needed to handle all remote delegate replies.
+ if ( aggregateController.getInputChannel(ENDPOINT_TYPE.JMS) == null ) {
+ // configure JMS input channel and add it to the controller
+ addInputChannelForRemoteReplies(aggregateController,ChannelType.REPLY);
+ }
+ // each remote delegate needs a dedicated JMS Listener to receive replies
+// addJmsListenerForReplyHandling(aggregateController, endpoint, remote );
+
+ }
+ addListenerForReplyHandling(aggregateController, endpoint, remote );
+
+ }
+ protected void addFlowController(AggregateAnalysisEngineController aggregateController, AnalysisEngineDescription rs) throws Exception {
+ String fcDescriptor=null;
+ System.out.println(rs.getSourceUrlString());
+
+ // first check if the AE aggregate descriptor defines a custom flow controller
+ if ( rs.getFlowControllerDeclaration() != null ) {
+ if( rs.getFlowControllerDeclaration().getImport() == null ) {
+ System.out.println("........................ What!!!!");
+ }
+
+ // the fc is either imported by name or a location
+ fcDescriptor = rs.getFlowControllerDeclaration().getImport().getName();
+ if ( fcDescriptor == null ) {
+ fcDescriptor = rs.getFlowControllerDeclaration().getImport().getLocation();
+
+ fcDescriptor = UimaASUtils.fixPath(rs.getSourceUrlString(), fcDescriptor);
+ } else {
+ throw new RuntimeException("*** Internal error - Invalid flowController specification - descriptor:"+rs.getFlowControllerDeclaration().getSourceUrlString());
+ }
+ } else {
+ FlowConstraints fc = rs.getAnalysisEngineMetaData().getFlowConstraints();
+ if (FlowControllerType.FIXED.name().equals(fc.getFlowConstraintsType()) ) {
+ fcDescriptor = ("*importByName:org.apache.uima.flow.FixedFlowController");
+ }
+ }
+ ((AggregateAnalysisEngineController_impl)aggregateController).setFlowControllerDescriptor(fcDescriptor);
+
+ }
+
+ protected Handler getMessageHandler(AnalysisEngineController controller) {
+ MetadataRequestHandler_impl metaHandler = new MetadataRequestHandler_impl("MetadataRequestHandler");
+ metaHandler.setController(controller);
+ ProcessRequestHandler_impl processHandler = new ProcessRequestHandler_impl("ProcessRequestHandler");
+ processHandler.setController(controller);
+ metaHandler.setDelegate(processHandler);
+ if ( !controller.isPrimitive() ) {
+ MetadataResponseHandler_impl metaResponseHandler =
+ new MetadataResponseHandler_impl("MetadataResponseHandler");
+ metaResponseHandler.setController(controller);
+ processHandler.setDelegate(metaResponseHandler);
+
+ ProcessResponseHandler processResponseHandler =
+ new ProcessResponseHandler("ProcessResponseHandler");
+ processResponseHandler.setController(controller);
+ metaResponseHandler.setDelegate(processResponseHandler);
+
+ }
+ return metaHandler;
+ }
+ protected void validateColocatedDelegates(DelegatesType dt, ResourceSpecifier resourceSpecifier) throws Exception {
+ for( DelegateAnalysisEngineType delegate : dt.getAnalysisEngineArray()) {
+ checkDelegateKey(delegate.getKey(), resourceSpecifier);
+ }
+ }
+
+ protected void validateRemoteDelegates(DelegatesType dt, ResourceSpecifier resourceSpecifier) throws Exception {
+ for( RemoteAnalysisEngineType remoteDelegate : dt.getRemoteAnalysisEngineArray() ) {
+ checkDelegateKey(remoteDelegate.getKey(), resourceSpecifier);
+ }
+
+
+ }
+
+ private void addCasMultiplierProperties(Endpoint endpoint, AnalysisEngineDelegate aed ) {
+ if ( aed.isCasMultiplier() ) {
+ endpoint.setIsCasMultiplier(true);
+ endpoint.setProcessParentLast(aed.getCasMultiplier().isProcessParentLast());
+ if ( aed.getCasMultiplier().getPoolSize() > 1) {
+ endpoint.setShadowCasPoolSize(aed.getCasMultiplier().getPoolSize());
+ }
+ if ( aed.getCasMultiplier().getInitialHeapSize() > 0 ) {
+ endpoint.setInitialFsHeapSize(aed.getCasMultiplier().getInitialHeapSize());
+ }
+ endpoint.setDisableJCasCache(aed.getCasMultiplier().disableJCasCache());
+ }
+
+ }
+ private void addTimeouts(Endpoint endpoint, AnalysisEngineDelegate aed ) {
+ endpoint.setMetadataRequestTimeout(aed.getGetMetaTimeout());
+ endpoint.setProcessRequestTimeout(aed.getProcessTimeout());
+ endpoint.setCollectionProcessCompleteTimeout(aed.getCpcTimeout());
+
+ }
+ private Endpoint getEndpoint(Entry<String,ResourceSpecifier> delegate, AggregateAnalysisEngineDelegate ddAggregate) {
+ // assume this delegate is not remote
+ boolean isRemote = false;
+ // Check if there is a DD configuration for this delegate
+ String serviceEndpoint = delegate.getKey();
+ AnalysisEngineDelegate aed = getMatchingDelegateFromDD(delegate.getKey(), ddAggregate);
+ if ( aed != null && aed.isRemote()) {
+ isRemote = true;
+ serviceEndpoint = resolvePlaceholder(((RemoteAnalysisEngineDelegate)aed).getQueueName());
+ }
+ // For each delegate create an Endpoint object.
+ Endpoint endpoint = new DelegateEndpoint(). new Builder().
+ withDelegateKey(delegate.getKey()).
+ withEndpointName(serviceEndpoint).
+ setRemote(isRemote).
+ withResourceSpecifier(delegate.getValue()).
+ build();
+ // if there is a DD configuration for this delegate, override defaults with
+ // configured values.
+ if (aed != null ) {
+ // if this service is a CasMultiplier, add CM properties to the endpoint object
+ addCasMultiplierProperties(endpoint, aed);
+ addTimeouts(endpoint, aed);
+ if ( endpoint.isRemote() ) {
+ configureRemoteEndpoint((RemoteAnalysisEngineDelegate)aed, endpoint);
+ } else {
+ configureColocatedEndpoint(aed, endpoint);
+ }
+ }
+ return endpoint;
+ }
+ private void configureColocatedEndpoint(AnalysisEngineDelegate aed, Endpoint endpoint) {
+ // co-located delegate may have optional scaleout info. Override
+ // defaults if necessary
+ if ( aed.getScaleout() > 1) {
+ endpoint.setConcurrentRequestConsumers(aed.getScaleout());
+ }
+ if ( aed.getReplyScaleout() > 1) {
+ endpoint.setConcurrentReplyConsumers(aed.getReplyScaleout());
+ }
+
+ }
+ private void configureRemoteEndpoint(RemoteAnalysisEngineDelegate remoteDelegate, Endpoint endpoint ) {
+ // a remote delegate must include broker and queue
+ endpoint.setServerURI(resolvePlaceholder(remoteDelegate.getBrokerURI()));
+ if ( remoteDelegate.getReplyScaleout() > 1 ) {
+ endpoint.setConcurrentRequestConsumers(remoteDelegate.getReplyScaleout());
+ }
+ if (remoteDelegate.getSerialization() != null ) {
+ SerialFormat sf = SerialFormat.valueOf(remoteDelegate.getSerialization().toUpperCase());
+ endpoint.setSerialFormat(sf);
+ }
+
+ }
+ protected Map<String, Endpoint> getDelegateEndpoints(AnalysisEngineDescription aeDescriptorAggregate, AggregateAnalysisEngineDelegate ddAggregate ) throws Exception {
+ Map<String, Endpoint> endpoints = new HashMap<>();
+
+ Map<String, ResourceSpecifier> delegates = aeDescriptorAggregate.getDelegateAnalysisEngineSpecifiers();
+ // Create an endpoint object for each delegate of this aggregate
+ for (Map.Entry<String, ResourceSpecifier> delegate : delegates.entrySet()) {
+ Endpoint endpoint = getEndpoint(delegate, ddAggregate);
+ endpoints.put(delegate.getKey(), endpoint);
+ }
+ return endpoints;
+ }
+ protected void validateDD(AnalysisEngineDeploymentDescriptionDocument dd, ResourceSpecifier resourceSpecifier) throws Exception {
+
+ if (resourceSpecifier instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+ // verify delegate keys in DD if this is an aggregate uima-as service
+ if (!aeDescriptor.isPrimitive()) {
+ AnalysisEngineType aet = getService(dd).getAnalysisEngine();
+ if ( aet == null ) {
+ aet = getService(dd).addNewAnalysisEngine();
+ }
+ if ( aet.getDelegates() != null ) {
+ DelegatesType dt = aet.getDelegates();
+ if ( dt.getAnalysisEngineArray() != null ) {
+ validateRemoteDelegates(dt, resourceSpecifier);
+ validateColocatedDelegates(dt, resourceSpecifier);
+ }
+ }
+ }
+ }
+ }
+ protected boolean validDelegate(ResourceSpecifier resourceSpecifier, String delegateKey) throws Exception {
+ if (resourceSpecifier instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+ if (aeDescriptor.isPrimitive()) {
+
+ } else {
+ Map<String, ResourceSpecifier> delegates = aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+ for (Map.Entry<String, ResourceSpecifier> delegate : delegates.entrySet()) {
+ String key = delegate.getKey();
+ if ( delegateKey.equals(key)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ protected void checkDelegateKey(String key, ResourceSpecifier resourceSpecifier) throws Exception {
+ if ( key == null || key.trim().length() == 0) {
+ throw new UimaAsDelegateException("*** ERROR ** The delegate key in the deployment descriptor not specified. The delegate key is required and must match a delegate in the referenced descriptor",
+ new RuntimeException("Invalid Delegate Key in Deployment Descriptor ("+key+")"));
+ }
+ if (!validDelegate(resourceSpecifier, key)) {
+ throw new UimaAsDelegateException("*** ERROR ** The delegate in the deployment descriptor with key="+key+" does not match any delegates in the referenced descriptor",
+ new RuntimeException("Invalid Delegate in Deployment Descriptor ("+key+")"));
+ }
+ }
+ protected boolean isDelegate(String parentController) {
+ return NoParent.equals(parentController);
+ }
+ protected boolean isTopLevel(String parentController) {
+ return NoParent.equals(parentController);
+ }
+ protected String getDelegateKey(String parentName, String name) {
+ String key = name;
+ // if ( parentName != "NoParent" && parentName != "TLS") {
+
+ if ( !parentName.equals("NoParent") && !parentName.equals("TLS")) {
+ key = parentName+"/"+name;
+ }
+// else {
+// key = name;
+// }
+ return key;
+ }
+ protected CasPoolType getCasPoolConfiguration(AnalysisEngineDeploymentDescriptionDocument doc) {
+ CasPoolType cp;
+ if ( (cp = doc.getAnalysisEngineDeploymentDescription().getDeployment().getCasPool()) == null ) {
+ cp = doc.getAnalysisEngineDeploymentDescription().getDeployment().addNewCasPool();
+ cp.setInitialFsHeapSize(512);
+ cp.setNumberOfCASes(1);
+ cp.setDisableJCasCache(false);
+ }
+ return cp;
+ }
+ protected int howManyInstances(AnalysisEngineDeploymentDescriptionDocument doc) {
+ int howMany = 1; //default
+ TopLevelAnalysisEngineType topLevel =
+ doc.getAnalysisEngineDeploymentDescription().
+ getDeployment().
+ getService().
+ getAnalysisEngine();
+ if ( topLevel != null && topLevel.getScaleout() != null ) {
+ howMany = topLevel.getScaleout().getNumberOfInstances();
+ }
+
+ return howMany;
+ }
+ protected void initialize(UimaASService service, CasPoolType cp, Transport transport) {
+
+ resourceManager = UimaClassFactory.produceResourceManager();
+ casManager = new AsynchAECasManager_impl(resourceManager);
+ casManager.setCasPoolSize(cp.getNumberOfCASes());
+ casManager.setDisableJCasCache(cp.getDisableJCasCache());
+ casManager.setInitialFsHeapSize(cp.getInitialFsHeapSize());
+
+ if ( transport.equals(Transport.JMS)) {
+ cache = new InProcessCache();
+ } else if ( transport.equals(Transport.Java)) {
+ if ( (cache = (InProcessCache)System.getProperties().get("InProcessCache")) == null) {
+ cache = new InProcessCache();
+ System.getProperties().put("InProcessCache", cache);
+ }
+
+ }
+// if ( cache == null ) {
+// cache = new InProcessCache();
+// }
+ }
+ protected String getAEDescriptorPath(AnalysisEngineDeploymentDescriptionDocument doc) {
+ ServiceType s = getService(doc);
+ if ( s.getTopDescriptor() == null ) {
+ throw new RuntimeException("Missing <topDescriptor> element in the deployment descriptor");
+ }
+ String aeDescriptor = "";
+ if ( s.getTopDescriptor().getImport().getName() != null ) {
+ aeDescriptor = s.getTopDescriptor().getImport().getName();
+ } else if ( s.getTopDescriptor().getImport().getLocation() != null ) {
+ aeDescriptor = s.getTopDescriptor().getImport().getLocation();
+ } else {
+ throw new RuntimeException("Missing <import> element in the deployment descriptor");
+ }
+ XmlDocumentProperties dp = doc.documentProperties();
+ System.out.println(dp.getSourceName());
+ aeDescriptor = UimaASUtils.fixPath(dp.getSourceName(), aeDescriptor);
+
+ return aeDescriptor;
+ }
+ protected ProcessCasErrorsType getTopLevelServiceErrorConfiguration(AnalysisEngineDeploymentDescriptionDocument doc) {
+ ProcessCasErrorsType topLevelErrorHandlingConfig = null;
+ ServiceType s = getService(doc);
+ AsyncPrimitiveErrorConfigurationType pec;
+ if ( s.getAnalysisEngine() != null && s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration() != null) {
+ pec = s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+ } else {
+ if ( s.getAnalysisEngine() == null ) {
+ s.addNewAnalysisEngine();
+ }
+ pec = s.getAnalysisEngine().addNewAsyncPrimitiveErrorConfiguration();
+ // Create default error handling for process CAS requests
+ // By default, any error will result in process termination
+ ProcessCasErrorsType pcet = pec.addNewProcessCasErrors();
+ pcet.setContinueOnRetryFailure("false");
+ pcet.setMaxRetries(0);
+ pcet.setThresholdAction("terminate");
+ pcet.setThresholdCount(0);
+ pcet.setThresholdWindow(0);
+ pcet.setTimeout(0); // no timeout
+
+ CollectionProcessCompleteErrorsType cpcet = pec.addNewCollectionProcessCompleteErrors();
+ cpcet.setTimeout(0);
+
+
+ }
+ topLevelErrorHandlingConfig = pec.getProcessCasErrors();
+ return topLevelErrorHandlingConfig;
+ }
+ public AnalysisEngineDeploymentDescriptionDocument getDD(String deploymentDescriptorPath ) throws Exception {
+ return parseDD(deploymentDescriptorPath);
+ }
+ protected AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception {
+ return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath));
+
+ }
+ private CasMultiplierNature getCasMultiplier(CasMultiplierType cm) {
+ String initialHeapSize = cm.getInitialFsHeapSize();
+ int poolSize = cm.getPoolSize();
+ String processParentLast = cm.getProcessParentLast();
+ CasMultiplierNature cmn = new CasMultiplierNature();
+ if ( initialHeapSize == null ) {
+ initialHeapSize = "512";
+ }
+ cmn.setInitialHeapSize(Integer.parseInt(initialHeapSize));
+ cmn.setPoolSize(poolSize);
+ if ( processParentLast == null ) {
+ processParentLast = "false";
+ }
+ cmn.setProcessParentLast(Boolean.parseBoolean(processParentLast));
+ cmn.disableJCasCache(cm.getDisableJCasCache());
+ return cmn;
+ }
+ private Map<String, Threshold> getThresholdMap(Class<? extends ErrorHandler> errorHandlerClass, ErrorHandlerChain errorHandlerChain ) {
+ Map<String, Threshold> thresholdMap = null;
+ for( ErrorHandler handler : errorHandlerChain ) {
+ if ( errorHandlerClass.isInstance(handler)) {
+ thresholdMap = handler.getEndpointThresholdMap();
+ break;
+ }
+ }
+ if ( thresholdMap == null ) {
+ thresholdMap = new HashMap<>();
+ }
+ return thresholdMap;
+ }
+
+ private void addRemoteDelegates(RemoteAnalysisEngineType[] remoteAnalysisEngineArray, AggregateAnalysisEngineDelegate aggregate ) {
+ ErrorHandlerChain errorHandlerChain = null;
+
+ // at this point the error handler chain must exist. See parse() method.
+ errorHandlerChain = aggregate.getDelegateErrorHandlerChain();
+ Map<String, Threshold> processThresholdMap =
+ getThresholdMap(ProcessCasErrorHandler.class,errorHandlerChain);
+ Map<String, Threshold> getMetaThresholdMap =
+ getThresholdMap(GetMetaErrorHandler.class,errorHandlerChain);
+ Map<String, Threshold> cpcThresholdMap =
+ getThresholdMap(CpcErrorHandler.class,errorHandlerChain);
+
+ for( RemoteAnalysisEngineType remote : remoteAnalysisEngineArray) {
+ String key = remote.getKey();
+ Threshold defaultThreshold = Thresholds.newThreshold();
+ RemoteAnalysisEngineDelegate delegate = new RemoteAnalysisEngineDelegate(key);
+ String brokerURL = resolvePlaceholder(remote.getInputQueue().getBrokerURL());
+ String queueName = resolvePlaceholder(remote.getInputQueue().getEndpoint());
+ int prefetch = remote.getInputQueue().getPrefetch();
+ delegate.setBrokerURI(brokerURL);
+ delegate.setQueueName(queueName);
+ delegate.setPrefetch(prefetch);
+ if ( remote.isSetCasMultiplier()) {
+ delegate.setCm(getCasMultiplier(remote.getCasMultiplier()));
+
+ }
+ int consumerCount = remote.getRemoteReplyQueueScaleout();
+ String serialization = SerialFormat.XMI.getDefaultFileExtension(); // default if not specified
+
+ if ( remote.getSerializer() != null ) {
+ serialization = remote.getSerializer().getStringValue();
+ }
+ if ( serialization != null && serialization.trim().length() > 0) {
+ delegate.setSerialization(serialization);
+ }
+
+ // if consumer count is defined override default which is 1
+ if ( consumerCount > 0 ) {
+ delegate.setScaleout(consumerCount);
+ }
+
+ if ( remote.getAsyncAggregateErrorConfiguration() != null ) {
+ Thresholds.addDelegateErrorThreshold(delegate,remote.getAsyncAggregateErrorConfiguration().getGetMetadataErrors(),getMetaThresholdMap);
+ Thresholds.addDelegateErrorThreshold(delegate,remote.getAsyncAggregateErrorConfiguration().getProcessCasErrors(),processThresholdMap);
+ Thresholds.addDelegateErrorThreshold(delegate,remote.getAsyncAggregateErrorConfiguration().getCollectionProcessCompleteErrors(),cpcThresholdMap);
+
+
+ } else {
+ // Error configuration is optional in the DD. Add defaults.
+ getMetaThresholdMap.put(key, defaultThreshold);
+ processThresholdMap.put(key, defaultThreshold);
+ cpcThresholdMap.put(key, defaultThreshold);
+ }
+
+ aggregate.addDelegate(delegate);
+
+ }
+ }
+ private void addColocatedDelegates(DelegateAnalysisEngineType[] analysisEngineArray, AggregateAnalysisEngineDelegate aggregate ) {
+
+
+ // at this point the error handler chain must exist. See parse() method.
+ ErrorHandlerChain errorHandlerChain = aggregate.getDelegateErrorHandlerChain();
+ Map<String, Threshold> processThresholdMap =
+ getThresholdMap(ProcessCasErrorHandler.class,errorHandlerChain);
+ Map<String, Threshold> getMetaThresholdMap =
+ getThresholdMap(GetMetaErrorHandler.class,errorHandlerChain);
+ Map<String, Threshold> cpcThresholdMap =
+ getThresholdMap(CpcErrorHandler.class,errorHandlerChain);
+
+ Threshold defaultThreshold = Thresholds.newThreshold();
+
+ // Add default error handling to each co-located delegate
+ for( DelegateAnalysisEngineType delegate : analysisEngineArray ) {
+ String key = delegate.getKey();
+ getMetaThresholdMap.put(key, defaultThreshold);
+ processThresholdMap.put(key, defaultThreshold);
+ cpcThresholdMap.put(key, defaultThreshold);
+ // recursively iterate over delegates until no more aggregates found
+ aggregate.addDelegate(parse(delegate));
+ }
+ }
+ private boolean isAggregate(AnalysisEngineType aet) {
+ // Is this an aggregate? An aggregate has a property async=true or has delegates.
+ System.out.println("......"+aet.getKey()+" aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" aet.isSetDelegates()="+aet.isSetDelegates() );
+
+ if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) {
+ return true;
+ }
+ return false;
+ }
+ private ErrorHandlerChain createServiceErrorHandlers() {
+ List<org.apache.uima.aae.error.ErrorHandler> errorHandlers =
+ new ArrayList<>();
+ // There is a dedicated handler for each of GetMeta, ProcessCas, and CPC
+ errorHandlers.add(new GetMetaErrorHandler(new HashMap<String, Threshold>()));
+ errorHandlers.add(new ProcessCasErrorHandler(new HashMap<String, Threshold>()));
+ errorHandlers.add(new CpcErrorHandler(new HashMap<String, Threshold>()));
+
+ return new ErrorHandlerChain(errorHandlers);
+ }
+ private boolean hasCollocatedDelegates( AnalysisEngineType aet) {
+ return ( aet.getDelegates() != null && aet.getDelegates().sizeOfAnalysisEngineArray() > 0 );
+ }
+ private boolean hasRemoteDelegates(AnalysisEngineType aet) {
+ return( aet.getDelegates() != null && aet.getDelegates().sizeOfRemoteAnalysisEngineArray() > 0);
+ }
+ protected AnalysisEngineDelegate parse(AnalysisEngineType aet) {
+ AnalysisEngineDelegate delegate = null;
+
+ if ( isAggregate(aet) ) {
+
+ delegate = new AggregateAnalysisEngineDelegate(aet.getKey());
+
+ ErrorHandlerChain errorHandlerChain = createServiceErrorHandlers();
+ ((AggregateAnalysisEngineDelegate)delegate).setDelegateErrorHandlerChain(errorHandlerChain);
+
+ // The DD object maintains two arrays, one for co-located delegates and the other for remotes.
+ // First handle co-located delegates.
+ if ( hasCollocatedDelegates(aet) ) {
+ DelegateAnalysisEngineType[] localAnalysisEngineArray =
+ aet.getDelegates().getAnalysisEngineArray();
+ addColocatedDelegates(localAnalysisEngineArray,(AggregateAnalysisEngineDelegate)delegate);
+ }
+ // Next add remote delegates of this aggregate
+ if ( hasRemoteDelegates(aet) ) {
+ RemoteAnalysisEngineType[] remoteAnalysisEngineArray =
+ aet.getDelegates().getRemoteAnalysisEngineArray();
+ addRemoteDelegates(remoteAnalysisEngineArray, (AggregateAnalysisEngineDelegate)delegate);
+ }
+ } else {
+ // This is a primitive
+ delegate = new AnalysisEngineDelegate(aet.getKey());
+ }
+ if ( Boolean.parseBoolean(aet.getAsync()) ) {
+ delegate.setAsync(true);
+ }
+
+ if ( aet.getInputQueueScaleout() != null ) {
+ delegate.setScaleout(Integer.valueOf(aet.getInputQueueScaleout()) );
+ }
+ if ( aet.getInternalReplyQueueScaleout() != null ) {
+ delegate.setReplyScaleout(Integer.valueOf(aet.getInternalReplyQueueScaleout()) );
+ }
+
+ if ( aet.getScaleout() != null ) {
+ delegate.setScaleout(aet.getScaleout().getNumberOfInstances() );
+ }
+
+ if ( aet.isSetCasMultiplier() ) {
+ delegate.setCm(getCasMultiplier(aet.getCasMultiplier()));
+ }
+
+ return delegate;
+ }
+
+ protected ServiceType getService(AnalysisEngineDeploymentDescriptionDocument doc) {
+ return doc.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+ }
+ public static String resolvePlaceholder(String placeholderName) {
+ Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
+ Matcher matcher = pattern.matcher(placeholderName);
+ StringBuffer buffer = new StringBuffer();
+
+ while (matcher.find()) {
+ String replacement = getValue(matcher.group(1));
+ if (replacement != null) {
+ matcher.appendReplacement(buffer, "");
+ buffer.append(replacement);
+ }
+ }
+ matcher.appendTail(buffer);
+ return buffer.toString();
+ }
+ public static String getValue(String placeholderName) {
+ System.out.println(".....Finding Match For Placeholder:"+placeholderName);
+ String value = System.getProperty(placeholderName);
+ if (value == null) {
+ // Check the environment
+ value = System.getenv(placeholderName);
+ }
+ return value;
+ }
+ protected boolean isPrimitive(AnalysisEngineDescription aeDescriptor, ServiceType st) {
+ if ( aeDescriptor.isPrimitive()) {
+ return true;
+ }
+ if (st.getAnalysisEngine() != null &&
+ (st.getAnalysisEngine().getAsync() != null &&
+ st.getAnalysisEngine().getAsync().equals("true")) ||
+ st.getAnalysisEngine().getDelegates() != null
+ ) {
+ return false;
+ }
+
+ return true;
+ }
+
+ protected boolean isRemote(AnalysisEngineType type) {
+ if ( type == null ) {
+ return false;
+ }
+ return (type instanceof RemoteAnalysisEngineType);
+ }
+ protected boolean isRemote(DelegatesType dt, String delegateKey) {
+ if ( dt == null || dt.getRemoteAnalysisEngineArray() == null ) {
+ return false; // there are no remote delegates
+ }
+ RemoteAnalysisEngineType[] remoteDelegates = dt.getRemoteAnalysisEngineArray();
+ for( RemoteAnalysisEngineType remoteDelegate : remoteDelegates ) {
+ if ( remoteDelegate.getKey().equals(delegateKey)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ protected class AnalysisEngineDeployment {
+ private Object deploymentConfiguration;
+
+ public AnalysisEngineDeployment( Object aet ) {
+ deploymentConfiguration = aet;
+ }
+
+ public boolean isRemote() {
+ if ( deploymentConfiguration == null || deploymentConfiguration instanceof AnalysisEngineType ) {
+ return false;
+ }
+ return true;
+ }
+ public boolean withConfiguration() {
+ return deploymentConfiguration != null;
+ }
+ public AnalysisEngineType asAnalysisEngineType() {
+ return (AnalysisEngineType)deploymentConfiguration;
+ }
+ public RemoteAnalysisEngineType asRemoteAnalysisEngineType() {
+ return (RemoteAnalysisEngineType)deploymentConfiguration;
+ }
+ public int getScaleout() {
+ int scaleout = 1; // default
+ if ( deploymentConfiguration != null && deploymentConfiguration instanceof AnalysisEngineType ) {
+ if ( ((AnalysisEngineType)deploymentConfiguration).getScaleout() != null ) {
+ scaleout = ((AnalysisEngineType)deploymentConfiguration).getScaleout().getNumberOfInstances();
+ }
+ }
+ return scaleout;
+ }
+ }
+
+ public enum Serialization {
+ XMI, Binary;
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/ServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.builder;
+
+public interface ServiceBuilder {
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.builder;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.management.ServiceNotFoundException;
+
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.InputChannel.ChannelType;
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.error.ErrorHandler;
+import org.apache.uima.aae.error.ErrorHandlerChain;
+import org.apache.uima.aae.error.Threshold;
+import org.apache.uima.aae.error.Thresholds;
+import org.apache.uima.aae.error.handler.CpcErrorHandler;
+import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.service.AsynchronousUimaASService;
+import org.apache.uima.aae.service.ServiceRegistry;
+import org.apache.uima.aae.service.UimaASService;
+import org.apache.uima.aae.service.UimaAsServiceRegistry;
+import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
+import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.as.client.DirectInputChannel;
+import org.apache.uima.as.client.DirectListener;
+import org.apache.uima.as.client.DirectMessage;
+import org.apache.uima.as.client.DirectOutputChannel;
+import org.apache.uima.as.client.Listener.Type;
+import org.apache.uima.resource.ResourceSpecifier;
+import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
+import org.apache.uima.resourceSpecifier.CasPoolType;
+import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
+import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+
+public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder {
+ private static final String NoParent = "NoParent";
+ /*
+ * private static enum FlowControllerType { FIXED }
+ */
+ static Map<String, Object> ddAEMap = new HashMap<>();
+
+ private int scaleout = 1;
+ public static void main(String[] args) {
+ try {
+ String tla = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAEGovNameDetector.xml";
+ String ptDescriptor = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/analysis_engine/PersonTitleAnnotator.xml";
+ // "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/descriptors/tutorial/ex4/MeetingDetectorTAE.xml";
+
+ // String dd1 =
+ // "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopLevelBlueJAggregateCM.xml";
+ String dd2 = "C:/uima/releases/builds/uima-as/2.8.1/uimaj-as-activemq/src/test/resources/deployment/Deploy_TopAggregateWithInnerAggregateCM.xml";
+ String dd = "C:/runtime/uima-as/2.8.1/apache-uima-as-2.8.1-SNAPSHOT/examples/deploy/as/Deploy_MeetingDetectorTAE.xml";
+
+ String dd3 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_PersonTitleAnnotator.xml";
+ String dd4 = "../uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotator.xml";
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback)
+ throws Exception {
+ // get the top level AnalysisEngine descriptor path
+ String aeDescriptorPath = getAEDescriptorPath(dd);
+ // parse AE descriptor
+ ResourceSpecifier resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aeDescriptorPath);
+ validateDD(dd, resourceSpecifier);
+ ServiceType serviceDefinition = getService(dd);
+ AnalysisEngineDelegate topLevelService;
+ // in DD the analysisEngine specification is optional
+ if (serviceDefinition.getAnalysisEngine() == null) {
+ topLevelService = new AnalysisEngineDelegate();
+ topLevelService.setResourceSpecifier((AnalysisEngineDescription) resourceSpecifier);
+ } else {
+ topLevelService = parse(getService(dd).getAnalysisEngine());
+ }
+ // resolve if placeholder used
+ String endpoint = resolvePlaceholder(serviceDefinition.getInputQueue().getEndpoint());
+
+ int howMany = 1;
+ AsynchronousUimaASService service = null;
+
+ // is this the only one resource specifier type supported by the current uima-as?
+ if (resourceSpecifier instanceof AnalysisEngineDescription) {
+ AnalysisEngineDescription aeDescriptor = (AnalysisEngineDescription) resourceSpecifier;
+ // Create a Top Level Service (TLS) wrapper. This wrapper may contain
+ // references to multiple TLS service instances if the TLS is scaled
+ // up.
+ service = new AsynchronousUimaASService(endpoint)
+ .withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+ .withResourceSpecifier(resourceSpecifier).withScaleout(howMany);
+
+ this.buildAndDeploy(dd, topLevelService, service, callback);
+
+
+ }
+ return service;
+ }
+
+ public UimaASService buildAndDeploy(AnalysisEngineDeploymentDescriptionDocument doc, AnalysisEngineDelegate delegate,
+ AsynchronousUimaASService service, ControllerCallbackListener callback) throws Exception {
+ // get top level CAS pool to
+ CasPoolType cp = getCasPoolConfiguration(doc);
+
+ super.addEnvironmentVariablesFromDD(doc);
+
+ System.setProperty("BrokerURI", "Direct");
+ // NEED TO INJECT shared InProcessCache here
+ //UimaAsServiceRegistry.getInstance().lookupById("");
+ initialize(service, cp, Transport.Java);
+
+ service.withInProcessCache(super.cache);
+ // Number of Analysis Engine instances
+ int howMany = howManyInstances(doc);
+ if ( howMany == 0 ) {
+ throw new IllegalArgumentException("Number of instances should be greater than zero - check dd");
+ }
+ AnalysisEngineController topLevelController = createController(delegate, service.getResourceSpecifier(),
+ service.getEndpoint(), null, howMany);
+ topLevelController.addControllerCallbackListener(callback);
+
+ topLevelController.setServiceId(service.getId());
+
+ ServiceType s = getService(doc);
+
+ AsyncPrimitiveErrorConfigurationType pec;
+ if (s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration() != null) {
+ pec = s.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration();
+
+ } else {
+ pec = addDefaultErrorHandling(s);
+ }
+
+ configureTopLevelService(topLevelController, service, pec, howMany);
+
+ return service;
+ }
+
+ private void addErrorHandling(AnalysisEngineController topLevelController, AsyncPrimitiveErrorConfigurationType pec) {
+ if (!topLevelController.isPrimitive() && pec != null) {
+
+ ErrorHandlerChain chain = topLevelController.getErrorHandlerChain();
+ Iterator<ErrorHandler> handlers = chain.iterator();
+ while (handlers.hasNext()) {
+ ErrorHandler eh = handlers.next();
+ Map<String, Threshold> map = eh.getEndpointThresholdMap();
+ if (eh instanceof ProcessCasErrorHandler) {
+ if (pec.getProcessCasErrors() != null) {
+ map.put("", Thresholds.getThreshold(pec.getProcessCasErrors().getThresholdAction(),
+ pec.getProcessCasErrors().getMaxRetries()));
+ } else {
+ map.put("", Thresholds.newThreshold());
+ }
+ } else if (eh instanceof GetMetaErrorHandler) {
+ if (pec.getCollectionProcessCompleteErrors() != null) {
+ map.put("", Thresholds.getThreshold("terminate", 0));
+ }
+ } else if (eh instanceof CpcErrorHandler) {
+ map.put("", Thresholds.getThreshold("", 0));
+ }
+ }
+
+ }
+ }
+ private void configureTopLevelService(AnalysisEngineController topLevelController,
+ AsynchronousUimaASService service, AsyncPrimitiveErrorConfigurationType pec, int howMany) throws Exception {
+ addErrorHandling(topLevelController, pec);
+
+
+ // create a single instance of OutputChannel for Direct communication if
+ // necessary
+ DirectOutputChannel outputChannel = null;
+ if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+ outputChannel = new DirectOutputChannel().withController(topLevelController);
+ topLevelController.addOutputChannel(outputChannel);
+ } else {
+ outputChannel = (DirectOutputChannel) topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+
+ DirectInputChannel inputChannel;
+ if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+ inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY).withController(topLevelController);
+ Handler messageHandlerChain = getMessageHandler(topLevelController);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ topLevelController.addInputChannel(inputChannel);
+ } else {
+ inputChannel = (DirectInputChannel) topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT);
+ }
+
+ if ( topLevelController instanceof AggregateAnalysisEngineController ) {
+ ((AggregateAnalysisEngineController_impl)topLevelController).setServiceEndpointName(service.getEndpoint());
+ }
+ BlockingQueue<DirectMessage> pQ = null; // service.getProcessRequestQueue();
+ BlockingQueue<DirectMessage> mQ = null; //service.getMetaRequestQueue();
+
+ // Lookup queue name in service registry. If this queue exists, the new service
+ // being
+ // created here will share the same queue to balance the load.
+ UimaASService s;
+ try {
+ s = UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint());
+ if ( s instanceof AsynchronousUimaASService) {
+// if (s != null && s instanceof AsynchronousUimaASService) {
+ pQ = ((AsynchronousUimaASService) s).getProcessRequestQueue();
+ mQ = ((AsynchronousUimaASService) s).getMetaRequestQueue();
+ }
+
+ } catch( Exception ee) {
+ pQ = service.getProcessRequestQueue();
+ mQ = service.getMetaRequestQueue();
+ }
+
+
+ scaleout = howMany;
+ DirectListener processListener = new DirectListener(Type.ProcessCAS).withController(topLevelController)
+ .withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ).
+ // withQueue(service.getProcessRequestQueue()).
+ initialize();
+
+ DirectListener getMetaListener = new DirectListener(Type.GetMeta).withController(topLevelController)
+ .withConsumerThreads(1).withInputChannel(inputChannel).
+ // withQueue(service.getMetaRequestQueue()).
+ withQueue(mQ).initialize();
+
+ addFreeCASListener(service, topLevelController, inputChannel, outputChannel, scaleout );
+
+ inputChannel.registerListener(getMetaListener);
+ inputChannel.registerListener(processListener);
+
+ service.withController(topLevelController);
+
+ }
+
+ private void addFreeCASListener( AsynchronousUimaASService service, AnalysisEngineController controller,
+ DirectInputChannel inputChannel, DirectOutputChannel outputChannel, int scaleout ) throws Exception {
+ DirectListener freCASChannelListener = null;
+ if (controller.isCasMultiplier()) {
+ freCASChannelListener = new DirectListener(Type.FreeCAS).withController(controller)
+ .withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(service.getFreeCasQueue())
+ .initialize();
+ inputChannel.registerListener(freCASChannelListener);
+ outputChannel.setFreeCASQueue(service.getFreeCasQueue());
+ }
+ }
+ public static InputChannel createInputChannel(ChannelType type) {
+ return new DirectInputChannel(type);
+ }
+
+
+ public static OutputChannel createOutputChannel(AnalysisEngineController controller) {
+ return new DirectOutputChannel().withController(controller);
+ }
+ private void createDirectOutputChannel(AggregateAnalysisEngineController controller) throws Exception {
+ // there should be one instance of OutputChannel for DIRECT. Create it, if one does not exist
+ if ( controller.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) {
+ OutputChannel oc = createOutputChannel(controller);
+ oc.initialize();
+ controller.addOutputChannel(oc);
+ }
+ }
+ /*
+ private DirectInputChannel createDirectInputChannel(AggregateAnalysisEngineController controller) throws Exception {
+ DirectInputChannel inputChannel;
+ if ( (controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null ) {
+ inputChannel = (DirectInputChannel)createInputChannel(ChannelType.REQUEST_REPLY);
+ ((DirectInputChannel)inputChannel).withController(controller);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ } else {
+ inputChannel = (DirectInputChannel)controller.getInputChannel(ENDPOINT_TYPE.DIRECT );
+ }
+ return inputChannel;
+ }
+*/
+ @Override
+ protected void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception {
+ // there should be one instance of OutputChannel for DIRECT. Create it, if one does not exist
+ createDirectOutputChannel(controller);
+
+ DirectInputChannel inputChannel;
+ if ( (controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null ) {
+ inputChannel =
+ new DirectInputChannel(ChannelType.REQUEST_REPLY).
+ withController(controller);
+ Handler messageHandlerChain = getMessageHandler(controller);
+ inputChannel.setMessageHandler(messageHandlerChain);
+ controller.addInputChannel(inputChannel);
+ } else {
+ inputChannel = (DirectInputChannel)controller.getInputChannel(ENDPOINT_TYPE.DIRECT );
+ }
+ BlockingQueue<DirectMessage> replyQueue =
+ new LinkedBlockingQueue<>();
+ DirectListener processListener =
+ new DirectListener(Type.Reply).
+ withController(controller).
+ withConsumerThreads(remoteDelegate.getReplyScaleout()).
+ withInputChannel(inputChannel).
+ withQueue(replyQueue).
+ initialize();
+ inputChannel.registerListener(processListener);
+
+ UimaASService service =
+ UimaAsServiceRegistry.getInstance().lookupByEndpoint(endpoint.getEndpoint());
+ if ( service != null && service instanceof AsynchronousUimaASService ) {
+ endpoint.setGetMetaDestination(((AsynchronousUimaASService)service).getMetaRequestQueue());
+ endpoint.setDestination(((AsynchronousUimaASService)service).getProcessRequestQueue());
+ endpoint.setReplyDestination(replyQueue);
+ }
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.SerializerCache;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.UimaSerializer;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.DelegateStats;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
+import org.apache.uima.aae.monitor.statistics.TimerStats;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.Marker;
+import org.apache.uima.cas.SerialFormat;
+import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
+import org.apache.uima.cas.impl.Serialization;
+import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.util.Level;
+
+public abstract class AbstractUimaAsCommand implements UimaAsCommand {
+ protected AnalysisEngineController controller;
+ private Object mux = new Object();
+
+ protected AbstractUimaAsCommand(AnalysisEngineController controller) {
+ this.controller = controller;
+ }
+
+ protected String getCasReferenceId(Class<?> concreteClassName, MessageContext aMessageContext) throws AsynchAEException {
+ if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
+ if (UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, concreteClassName.getName(),
+ "getCasReferenceId", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_message_has_cas_refid__INFO",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ }
+ return null;
+ }
+ return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ }
+
+ protected CacheEntry getCacheEntryForCas(String casReferenceId) {
+ try {
+ return controller.getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ } catch (AsynchAEException e) {
+ return new InProcessCache.UndefinedCacheEntry();
+ }
+ }
+
+ protected CasStateEntry createCasStateEntry(String casReferenceId) {
+ return controller.getLocalCache().createCasStateEntry(casReferenceId);
+ }
+
+ protected CasStateEntry getCasStateEntry(String casReferenceId) {
+ CasStateEntry casStateEntry = null;
+ if ((casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId)) == null) {
+ // Create a new entry in the local cache for the CAS received from the remote
+ casStateEntry = createCasStateEntry(casReferenceId);
+ }
+ return casStateEntry;
+ }
+
+ protected boolean isTopLevelAggregate() {
+ return (controller.isTopLevelComponent() && controller instanceof AggregateAnalysisEngineController);
+ }
+
+ protected void handleError(Exception e, CacheEntry cacheEntry, MessageContext mc) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+ controller.getComponentName());
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ errorContext.add(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
+ controller.dropCAS(cacheEntry.getCas());
+ controller.getErrorHandlerChain().handle(e, errorContext, controller);
+
+ }
+
+ protected void saveStats(CacheEntry entry, long inTime, long t1, long timeWaitingForCAS) {
+ long timeToDeserializeCAS = controller.getCpuTime() - t1;
+ controller.incrementDeserializationTime(timeToDeserializeCAS);
+ entry.incrementTimeToDeserializeCAS(timeToDeserializeCAS);
+ entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+
+ LongNumericStatistic statistic;
+ if ((statistic = controller.getMonitor().getLongNumericStatistic("", Monitor.TotalDeserializeTime)) != null) {
+ statistic.increment(timeToDeserializeCAS);
+ }
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "saveStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
+ new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
+ }
+
+ // Update Stats
+ ServicePerformance casStats = controller.getCasStatistics(entry.getCasReferenceId());
+ casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
+ if (controller.isTopLevelComponent()) {
+ synchronized (mux) {
+ controller.getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
+ }
+ }
+ controller.saveTime(inTime, entry.getCasReferenceId(), controller.getName());
+ if (!controller.isPrimitive()) {
+ DelegateStats stats = new DelegateStats();
+ if (entry.getStat() == null) {
+ entry.setStat(stats);
+ // Add entry for self (this aggregate). MessageContext.getEndpointName()
+ // returns the name of the queue receiving the message.
+ stats.put(controller.getServiceEndpointName(), new TimerStats());
+ } else {
+ if (!stats.containsKey(controller.getServiceEndpointName())) {
+ stats.put(controller.getServiceEndpointName(), new DelegateStats());
+ }
+ }
+ }
+
+ }
+
+ protected static ErrorContext populateErrorContext(MessageContext aMessageCtx) {
+ ErrorContext errorContext = new ErrorContext();
+ if (aMessageCtx != null) {
+ try {
+ if (aMessageCtx.propertyExists(AsynchAEMessage.Command)) {
+ errorContext.add(AsynchAEMessage.Command,
+ aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command));
+ }
+
+ if (aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) {
+ errorContext.add(AsynchAEMessage.MessageType,
+ aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType));
+ }
+
+ if (aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) {
+ errorContext.add(AsynchAEMessage.CasReference,
+ aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference));
+ }
+ errorContext.add(UIMAMessage.RawMsg, aMessageCtx.getRawMessage());
+ } catch (Exception e) { /* ignore */
+ }
+ }
+ return errorContext;
+ }
+
+ protected Endpoint fetchParentCasOrigin(String parentCasId) throws AsynchAEException {
+ Endpoint endpoint = null;
+ String parentId = parentCasId;
+ // Loop through the parent tree until an origin is found
+ while (parentId != null) {
+ // Check if the current parent has an associated origin. Only input CAS
+ // has an origin of the request. The origin is an endpoint of a client
+ // who sent an input CAS for processing
+ endpoint = ((AggregateAnalysisEngineController) controller).getMessageOrigin(parentId);
+ // Check if there is an origin. If so, we are done
+ if (endpoint != null) {
+ break;
+ }
+ // The current parent has no origin, get its parent and try again
+ CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(parentId);
+ parentId = entry.getInputCasReferenceId();
+ }
+ return endpoint;
+ }
+
+ protected CAS getNewCAS(CASFactory factory, String casRequestorOrigin) {
+ CAS cas = null;
+ // Aggregate time spent waiting for a CAS in the service cas pool
+ controller.getServicePerformance().beginWaitOnCASPool();
+ if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.FINE, this.getClass().getName(), "deserializeChildCAS",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
+ new Object[] { casRequestorOrigin });
+ }
+ cas = factory.newCAS();
+
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "deserializeChildCAS",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
+ new Object[] { casRequestorOrigin });
+ }
+
+ controller.getServicePerformance().endWaitOnCASPool();
+
+ ServicePerformance sp = controller.getServicePerformance();
+ sp.incrementCasPoolWaitTime(sp.getTimeWaitingForCAS());
+
+ return cas;
+ }
+
+ protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint,
+ MessageContext mc) throws Exception {
+ SerializationResult result = new SerializationResult();
+
+ // Aggregate time spent waiting for a CAS in the shadow cas pool
+ ((AggregateAnalysisEngineController) controller).getDelegateServicePerformance(casMultiplierDelegateKey)
+ .beginWaitOnShadowCASPool();
+
+ long t1 = controller.getCpuTime();
+ CAS cas = getNewCAS(new ChildCASFactory(casMultiplierDelegateKey), casMultiplierDelegateKey);
+ result.setCas(cas);
+
+ result.setTimeWaitingForCAS(controller.getCpuTime() - t1);
+
+ ((AggregateAnalysisEngineController) controller).getDelegateServicePerformance(casMultiplierDelegateKey)
+ .endWaitOnShadowCASPool();
+
+ // Check if we are still running
+ if (controller.isStopped()) {
+ // The Controller is in shutdown state.
+ controller.dropCAS(cas);
+ return null;
+ }
+ // Create deserialized wrapper for XMI, BINARY, COMPRESSED formats. To add
+ // a new serialization format add a new class which implements
+ // UimaASDeserializer and modify DeserializerFactory class.
+ UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+ deserializer.deserialize(result);
+
+ return result;
+ }
+
+ protected SerializationResult deserializeInputCAS( MessageContext mc)
+ throws Exception {
+ SerializationResult result = new SerializationResult();
+ String origin = mc.getEndpoint().getEndpoint();
+ Endpoint endpoint = mc.getEndpoint();
+
+ // Time how long we wait on Cas Pool to fetch a new CAS
+ long t1 = controller.getCpuTime();
+ CAS cas = getNewCAS(new InputCASFactory(), origin);
+ result.setCas(cas);
+
+ result.setTimeWaitingForCAS(controller.getCpuTime() - t1);
+
+ // Check if we are still running
+ if (controller.isStopped()) {
+ // The Controller is in shutdown state.
+ controller.dropCAS(cas);
+ return null;
+ }
+
+ UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc);
+ deserializer.deserialize(result);
+
+ return result;
+ }
+ protected Delegate getDelegate(MessageContext mc) throws AsynchAEException {
+ String delegateKey = null;
+ if (mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length() == 0) {
+ String fromEndpoint = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(fromEndpoint);
+ } else {
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+ }
+ return ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateKey);
+ }
+
+ public static class DeserializerFactory {
+ // only static reference allowed
+ private DeserializerFactory() {
+ }
+ public static UimaASDeserializer newDeserializer(Endpoint endpoint, MessageContext mc) throws AsynchAEException {
+ switch (endpoint.getSerialFormat()) {
+ case XMI:
+ return new XMIDeserializer(mc.getStringMessage());
+ case BINARY:
+ return new BinaryDeserializer(mc.getByteMessage(), endpoint);
+ case COMPRESSED_FILTERED:
+ return new CompressedFilteredDeserializer(mc.getByteMessage(), endpoint);
+ default:
+ throw new AsynchAEException("Never Happen");
+
+ }
+ }
+ }
+
+ public static interface UimaASDeserializer {
+ public void deserialize(SerializationResult result) throws Exception;
+ }
+
+ public static class CompressedFilteredDeserializer implements UimaASDeserializer {
+ byte[] binarySource;
+ Endpoint endpoint;
+
+ public CompressedFilteredDeserializer(byte[] binarySource, Endpoint endpoint) {
+ this.binarySource = binarySource;
+ this.endpoint = endpoint;
+ }
+
+ public void deserialize(SerializationResult result) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(binarySource);
+ ReuseInfo reuseInfo = Serialization
+ .deserializeCAS(result.getCas(), bais, endpoint.getTypeSystemImpl(), null).getReuseInfo();
+ result.setReuseInfo(reuseInfo);
+ }
+ }
+
+ public static class BinaryDeserializer implements UimaASDeserializer {
+ byte[] binarySource;
+ Endpoint endpoint;
+
+ public BinaryDeserializer(byte[] binarySource, Endpoint endpoint) {
+ this.binarySource = binarySource;
+ this.endpoint = endpoint;
+ }
+
+ public void deserialize(SerializationResult result) throws Exception {
+ UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
+ // BINARY format may be COMPRESSED etc, so update it upon reading
+ SerialFormat serialFormat =
+ uimaSerializer.deserializeCasFromBinary(binarySource, result.getCas());
+ // BINARY format may be COMPRESSED etc, so update it upon reading
+ endpoint.setSerialFormat(serialFormat);
+ }
+ }
+
+ public static class XMIDeserializer implements UimaASDeserializer {
+ String xmi;
+
+ public XMIDeserializer(String xmi) {
+ this.xmi = xmi;
+ }
+
+ public void deserialize(SerializationResult result) throws Exception {
+ UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ uimaSerializer.deserializeCasFromXmi(xmi, result.getCas(), deserSharedData, true, -1);
+ result.setDeserSharedData(deserSharedData);
+ }
+
+ }
+
+ public static class SerializationResult {
+ Marker marker = null;
+ CAS cas = null;
+ XmiSerializationSharedData deserSharedData = null;
+ ReuseInfo reuseInfo = null;
+ boolean acceptsDeltaCas = false;
+ long timeWaitingForCAS = 0;
+
+ public long getTimeWaitingForCAS() {
+ return timeWaitingForCAS;
+ }
+
+ public void setTimeWaitingForCAS(long timeWaitingForCAS) {
+ this.timeWaitingForCAS = timeWaitingForCAS;
+ }
+
+ public boolean acceptsDeltaCas() {
+ return acceptsDeltaCas;
+ }
+
+ public void setAcceptsDeltaCas(boolean acceptsDeltaCas) {
+ this.acceptsDeltaCas = acceptsDeltaCas;
+ }
+
+ public Marker getMarker() {
+ return marker;
+ }
+
+ public void setMarker(Marker marker) {
+ this.marker = marker;
+ }
+
+ public CAS getCas() {
+ return cas;
+ }
+
+ public void setCas(CAS cas) {
+ this.cas = cas;
+ }
+
+ public XmiSerializationSharedData getDeserSharedData() {
+ return deserSharedData;
+ }
+
+ public void setDeserSharedData(XmiSerializationSharedData deserSharedData) {
+ this.deserSharedData = deserSharedData;
+ }
+
+ public ReuseInfo getReuseInfo() {
+ return reuseInfo;
+ }
+
+ public void setReuseInfo(ReuseInfo reuseInfo) {
+ this.reuseInfo = reuseInfo;
+ }
+
+ }
+
+ public interface CASFactory {
+ public CAS newCAS();
+
+ }
+
+ public class ChildCASFactory implements CASFactory {
+ String casMultiplierDelegateKey;
+
+ public ChildCASFactory(String casMultiplierDelegateKey) {
+ this.casMultiplierDelegateKey = casMultiplierDelegateKey;
+ }
+
+ public CAS newCAS() {
+ return controller.getCasManagerWrapper().getNewCas(casMultiplierDelegateKey);
+ }
+
+ }
+
+ public class InputCASFactory implements CASFactory {
+ public CAS newCAS() {
+ return controller.getCasManagerWrapper().getNewCas();
+ }
+
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.message.MessageContext;
+
+public class CollectionProcessCompleteRequestCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public CollectionProcessCompleteRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+ public void execute() throws Exception {
+ Endpoint endpoint = mc.getEndpoint();
+ controller.collectionProcessComplete(endpoint);
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+
+public class CollectionProcessCompleteResponseCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public CollectionProcessCompleteResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+ public void execute() throws Exception {
+ Delegate delegate = super.getDelegate(mc);
+ try {
+ ((AggregateAnalysisEngineController)controller)
+ .processCollectionCompleteReplyFromDelegate(delegate.getKey(), true);
+ } catch (Exception e) {
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete);
+ errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ controller.getErrorHandlerChain().handle(e, errorContext, controller);
+ }
+ }
+}