You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [9/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uima...
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+
+public class CommandFactory {
+ // Can't instantiate this factory. Use static methods only
+ private CommandFactory() {
+
+ }
+ public static UimaAsCommand newCommand(MessageContext mc, AnalysisEngineController controller)
+ throws AsynchAEException {
+ // Message type is either Request or Response
+ int messageType = mc.getMessageIntProperty(AsynchAEMessage.MessageType);
+
+ if (messageType == AsynchAEMessage.Request) {
+ return newRequestCommand(mc, controller);
+ } else if (messageType == AsynchAEMessage.Response) {
+ return newResponseCommand(mc, controller);
+ }
+
+ return CommandBuilder.createNoOpCommand(mc, controller);
+ }
+ private static UimaAsCommand newRequestCommand(MessageContext mc, AnalysisEngineController controller)
+ throws AsynchAEException {
+ int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
+ UimaAsCommand command2Run;
+ switch (command) {
+ case AsynchAEMessage.Process:
+ if (mc.propertyExists(AsynchAEMessage.CasSequence)) {
+ command2Run = CommandBuilder.createProcessChildCasRequestCommand(mc, controller);
+ } else {
+ command2Run = CommandBuilder.createProcessInputCasRequestCommand(mc, controller);
+ }
+ break;
+ case AsynchAEMessage.GetMeta:
+ command2Run = CommandBuilder.createGetMetaRequestCommand(mc, controller);
+ break;
+ case AsynchAEMessage.CollectionProcessComplete:
+ command2Run = CommandBuilder.createCollectionProcessCompleteRequestCommand(mc, controller);
+ break;
+
+ case AsynchAEMessage.ReleaseCAS:
+ command2Run = CommandBuilder.createReleaseCASRequestCommand(mc, controller);
+ break;
+ default:
+ command2Run = CommandBuilder.createNoOpCommand(mc, controller);
+ break;
+ }
+ return command2Run;
+ }
+
+ private static UimaAsCommand newResponseCommand(MessageContext mc, AnalysisEngineController controller)
+ throws AsynchAEException {
+ int command = mc.getMessageIntProperty(AsynchAEMessage.Command);
+ UimaAsCommand command2Run;
+ switch (command) {
+ case AsynchAEMessage.Process:
+ if (mc.propertyExists(AsynchAEMessage.CasSequence)) {
+ command2Run = CommandBuilder.createProcessChildCasResponseCommand(mc, controller);
+ } else {
+ command2Run = CommandBuilder.createProcessInputCasResponseCommand(mc, controller);
+ }
+ break;
+ case AsynchAEMessage.GetMeta:
+ command2Run = CommandBuilder.createGetMetaResponseCommand(mc, controller);
+ break;
+
+ case AsynchAEMessage.CollectionProcessComplete:
+ command2Run = CommandBuilder.createCollectionProcessCompleteResponseCommand(mc, controller);
+ break;
+ default:
+ command2Run = CommandBuilder.createNoOpCommand(mc, controller);
+ break;
+ }
+ return command2Run;
+ }
+
+
+
+ private static class CommandBuilder {
+ // Can't instantiate the builder. Use static calls only
+ private CommandBuilder() {
+
+ }
+ static UimaAsCommand createProcessChildCasRequestCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessChildCasRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createProcessInputCasRequestCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessInputCasRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createProcessChildCasResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessChildCasResponseCommand(mc, controller);
+ }
+
+ static UimaAsCommand createProcessInputCasResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new ProcessInputCasResponseCommand(mc, controller);
+ }
+
+ static UimaAsCommand createGetMetaResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new GetMetaResponseCommand(mc, controller);
+ }
+
+ static UimaAsCommand createGetMetaRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new GetMetaRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createReleaseCASRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new ReleaseCASRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createNoOpCommand(MessageContext mc, AnalysisEngineController controller) {
+ return new NoOpCommand(mc, controller);
+ }
+
+ static UimaAsCommand createCollectionProcessCompleteRequestCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new CollectionProcessCompleteRequestCommand(mc, controller);
+ }
+
+ static UimaAsCommand createCollectionProcessCompleteResponseCommand(MessageContext mc,
+ AnalysisEngineController controller) {
+ return new CollectionProcessCompleteResponseCommand(mc, controller);
+ }
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class GetMetaRequestCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public GetMetaRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+ public void execute() throws Exception {
+ Endpoint endpoint = mc.getEndpoint();
+ if (controller.isTopLevelComponent()) {
+ endpoint.setCommand(AsynchAEMessage.GetMeta);
+ controller.cacheClientEndpoint(endpoint);
+ }
+ if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(), "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_handling_metadata_request__FINEST",
+ new Object[] { endpoint.getEndpoint() });
+ }
+ controller.getControllerLatch().waitUntilInitialized();
+ // Check to see if the controller hasnt been aborted while we were waiting on the latch
+ if (!controller.isStopped()) {
+System.out.println("............................ Service:"+controller.getComponentName()+" Dispatching Metadata to Client");
+ controller.sendMetadata(endpoint);
+ }
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.resource.metadata.ResourceMetaData;
+import org.apache.uima.util.XMLInputSource;
+
+public class GetMetaResponseCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public GetMetaResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+ public void execute() throws Exception {
+ System.out.println(".......... GetMetaResponseCommand.execute()- handling GetMeta response - Controller:"+controller.getName());
+ // Endpoint endpoint = mc.getEndpoint();
+ int payload = mc
+ .getMessageIntProperty(AsynchAEMessage.Payload);
+
+
+ if (AsynchAEMessage.Exception == payload) {
+ return;
+ }
+
+ String fromEndpoint = mc
+ .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(fromEndpoint);
+ ResourceMetaData resource = null;
+ int serializationSupportedByRemote = AsynchAEMessage.None;
+// ((MessageContext) anObjectToHandle).getMessageIntProperty(AsynchAEMessage.SERIALIZATION);
+
+ if ( serializationSupportedByRemote == AsynchAEMessage.None ) {
+ resource = (ResourceMetaData)
+ ((MessageContext)mc).getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+ } else {
+ String analysisEngineMetadata = ((MessageContext) mc).getStringMessage();
+ ByteArrayInputStream bis = new ByteArrayInputStream(analysisEngineMetadata.getBytes());
+ XMLInputSource in1 = new XMLInputSource(bis, null);
+ resource = UIMAFramework.getXMLParser().parseResourceMetaData(in1);
+ }
+ String fromServer = null;
+ if (((MessageContext) mc).propertyExists(AsynchAEMessage.EndpointServer)) {
+ fromServer = ((MessageContext) mc)
+ .getMessageStringProperty(AsynchAEMessage.EndpointServer);
+ }
+ ((AggregateAnalysisEngineController)controller).changeCollocatedDelegateState(delegateKey, ServiceState.RUNNING);
+ // If old service does not echo back the external broker name then the queue name must
+ // be unique.
+ // The ServerURI set by the service may be its local name for the broker, e.g.
+ // tcp://localhost:61616
+
+ ((AggregateAnalysisEngineController) controller).mergeTypeSystem(
+ resource, fromEndpoint, fromServer);
+ ((AggregateAnalysisEngineController) controller).setRemoteSerializationSupported(serializationSupportedByRemote, fromEndpoint, fromServer);
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+
+public class NoOpCommand extends AbstractUimaAsCommand {
+ MessageContext mc;
+ public NoOpCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+ public void execute() throws Exception {
+ System.out.println("*******************************************************"
+ + "\nNoOpCommand.execute() - Either wrong command or message type - Command:"+
+ mc.getMessageIntProperty(AsynchAEMessage.Command) + " MessageType:"+
+ mc.getMessageIntProperty(AsynchAEMessage.MessageType) + " Service:"+controller.getComponentName() +
+ "\n*******************************************************");
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class PingRequestCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public PingRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+
+ public void execute() throws Exception {
+ try {
+ ENDPOINT_TYPE et = ENDPOINT_TYPE.DIRECT; // default
+ if ( mc.getEndpoint().isRemote() ) {
+ et = ENDPOINT_TYPE.JMS;
+ }
+
+ controller.getOutputChannel(et).sendReply(AsynchAEMessage.Ping, mc.getEndpoint(), null, false);
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.WARNING)) {
+ if (controller != null) {
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+ controller.getComponentName());
+ }
+
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, getClass().getName(), "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.Marker;
+import org.apache.uima.util.Level;
+
+public class ProcessChildCasRequestCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public ProcessChildCasRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+
+ public void execute() throws Exception {
+ int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+ String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
+ String parentCasReferenceId = mc.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ System.out.println(">>>>>>>>>>>>>>> Controller:"+controller.getComponentName()+
+ " in ProcessChildCasRequestCommand.execute() - Child CAS:"+casReferenceId+
+ " Parent CAS:"+parentCasReferenceId+
+ " from "+mc
+ .getMessageStringProperty(AsynchAEMessage.MessageFrom)
+ );
+
+ if (parentCasReferenceId == null) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(
+ Level.INFO,
+ getClass().getName(),
+ "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_input_cas_invalid__INFO",
+ new Object[] { controller.getComponentName(), mc.getEndpointName(),
+ parentCasReferenceId });
+ }
+
+ // LOG THIS
+ System.out.println(
+ "ProcessChildCasRequestCommand.execute() - Parent CasReferenceId is missing in MessageContext - Ignoring Message");
+ return; // Nothing to do
+
+ }
+ if (casReferenceId == null) {
+ // LOG THIS
+ System.out.println(
+ "ProcessChildCasRequestCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+ return; // Nothing to do
+ }
+ // Save Process command in the client endpoint.
+ Endpoint clientEndpoint = controller.getClientEndpoint();
+ if (clientEndpoint != null) {
+ clientEndpoint.setCommand(AsynchAEMessage.Process);
+ }
+
+ // Cas was passed by reference meaning InProcessCache must have it
+ if (AsynchAEMessage.CASRefID == payload) {
+ System.out.println(
+ "ProcessChildCasRequestCommand.execute() - Child CasReferenceId:"+casReferenceId+" From Co-located CM");
+
+ if (mc.getEndpoint() == null) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
+ new Object[] { casReferenceId });
+ }
+ return;
+ }
+
+ executeDirectRequest(casReferenceId, parentCasReferenceId);
+ } else {
+ // Check if there is an XMI cargo in the message
+ if (mc.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
+ && mc.getStringMessage() == null) {
+ return; // No XMI just return
+ }
+ System.out.println(
+ "ProcessChildCasRequestCommand.execute() - Child CasReferenceId:"+casReferenceId+" From Remote CM");
+
+ executeRemoteRequest(casReferenceId, parentCasReferenceId);
+ }
+ }
+ private void saveFreeCasDestination(CasStateEntry childCasStateEntry) throws AsynchAEException {
+ if ( mc.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue) != null ) {
+ Object freeCASQueue =
+ mc.getMessageObjectProperty(AsynchAEMessage.FreeCASQueue);
+ Endpoint freeCasNotificationEndpoint =
+ new Endpoint_impl();
+ freeCasNotificationEndpoint.setServerURI("java");
+ freeCasNotificationEndpoint.setFreeCasEndpoint(true);
+ freeCasNotificationEndpoint.setJavaRemote();
+ freeCasNotificationEndpoint.setDestination(freeCASQueue);
+ // the aggregate will send FREE CAS request to the delegate deployed
+ // in the same JVM
+ childCasStateEntry.setFreeCasNotificationEndpoint(freeCasNotificationEndpoint);
+ }
+ }
+ private CasStateEntry getParentCasStateEntry(String parentCasReferenceId, Delegate delegateCasMultiplier ) throws AsynchAEException {
+ // fetch parent CAS entry from this aggregate's local cache. This must
+ // exist if we receive a child CAS generated from the parent
+ CasStateEntry parentCasStateEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId);
+ if ( parentCasStateEntry == null ) {
+ throw new AsynchAEException("Parent CAS "+parentCasReferenceId+ " Not Found in InprocessCache");
+ }
+ return parentCasStateEntry;
+ }
+ private void associateInputCASOriginWithChildCAS(String childCasReferenceId, String parentCasReferenceId, Delegate delegateCasMultiplier )
+ throws AsynchAEException {
+
+ Endpoint inputCasOrigin = fetchParentCasOrigin(parentCasReferenceId);
+ if ( controller.isCasMultiplier()) {
+ // associate this aggregate client reply endpoint with the child CAS
+ // Since this aggregate is a CM, the child CAS maybe sent to this client.
+ ((AggregateAnalysisEngineController) controller).addMessageOrigin(childCasReferenceId, inputCasOrigin);
+ }
+ if (inputCasOrigin == null) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_origin_not_found__INFO",
+ new Object[] { controller.getComponentName(), parentCasReferenceId });
+ }
+
+ } else {
+ ((AggregateAnalysisEngineController) controller).addMessageOrigin(childCasReferenceId, inputCasOrigin);
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINEST, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_origin_added__FINEST", new Object[] {
+ controller.getComponentName(), childCasReferenceId, delegateCasMultiplier.getKey() });
+ }
+ }
+ }
+ private void executeDirectRequest(String childCasReferenceId, String parentCasReferenceId) {
+ CAS cas = null;
+ try {
+ // if the child CAS was produced by a co-located delegate CM
+ // of this aggregate, the CasStateEntry instance has already
+ // been created for the child by the CM controller in its
+ // process() method. If the child CAS was
+ // produced by an *aggregate* CM delegate, a new CasStateEntry
+ // needs to be created. The CasStateEntry is local to each
+ // aggregate.
+ CasStateEntry childCasStateEntry = getCasStateEntry(childCasReferenceId);
+
+ // get the delegate CM where the child CAS was produced
+ Delegate delegateCasMultiplier = getLastDelegate(childCasStateEntry);
+ // associate the delegate CM with the child CAS
+ setLastDelegate(delegateCasMultiplier, childCasStateEntry);
+
+ // the parent CAS must be in this controller's local cache
+ CasStateEntry parentCasStateEntry =
+ getParentCasStateEntry(parentCasReferenceId, delegateCasMultiplier );
+
+ // if a child CAS has been created in a primitive CM delegate of
+ // this aggregate the parent CAS id has already been assigned by
+ // the CM. If the child CAS was created by a remote java CM delegate
+ // we need to associate the child with its parent CAS here. Remote
+ // java CM is deployed in the same JVM but is considered as remote
+ if (childCasStateEntry.getParentCasReferenceId() == null) {
+ childCasStateEntry.setParentCasReferenceId(parentCasReferenceId);
+ }
+ // Check if the parent CAS is in a failed state first
+ if (parentCasStateEntry.isFailed()) {
+ // handle CAS release
+ controller.process(null, childCasReferenceId);
+ return;
+ }
+ // if this aggregate is a CM, we may send this child CAS to the aggregate's client.
+ // Associate this aggregate client with the child CAS
+ childCasStateEntry.setClientEndpoint(parentCasStateEntry.getClientEndpoint());
+ // every child CAS is associated with an input CAS. An input CAS may be different
+ // from the child CAS' parent CAS if this aggregate has more than one CM. An input
+ // CAS is a top ancestor from which child CASes are produced.
+ childCasStateEntry.setInputCasReferenceId(parentCasStateEntry.getInputCasReferenceId());
+
+ mc.getEndpoint().setIsCasMultiplier(true);
+ associateInputCASOriginWithChildCAS(childCasReferenceId, parentCasReferenceId, delegateCasMultiplier );
+
+ Endpoint e = ((AggregateAnalysisEngineController_impl)controller).
+ getDestinations().get(delegateCasMultiplier.getKey());
+
+ // delegate of this aggregate can be deployed in the same JVM but its deployed
+ // with a Java queue instead of JMS. Such service is called JavaRemote delegate.
+ // This is an independent CM service and we need to send explicit FreeCAS request
+ // for every child CAS received from there. Its like a remote but runs in the
+ // same JVM as this aggregate.
+ if ( e.isJavaRemote() ) {
+ saveFreeCasDestination(childCasStateEntry);
+ // Increment parent CAS child count. This is needed to determine when the parent
+ // can be released. Only if child count is zero, the parent CAS can be released.
+ parentCasStateEntry.incrementSubordinateCasInPlayCount();
+ //System.out.println("..Controller:"+controller.getComponentName()+" Processing Child CAS:"+childCasReferenceId+" Incremented Parent CAS:"+parentCasReferenceId+ " Child Count to:"+parentCasStateEntry.getSubordinateCasInPlayCount());
+ }
+ long arrivalTime = System.nanoTime();
+ controller.saveTime(arrivalTime, childCasReferenceId, controller.getName());
+
+ // Save Process command in the client endpoint.
+ Endpoint clientEndpoint = controller.getClientEndpoint();
+ if (clientEndpoint != null) {
+ clientEndpoint.setCommand(AsynchAEMessage.Process);
+ }
+
+ if (controller.isStopped()) {
+ return;
+ }
+ cas = controller.getInProcessCache().getCasByReference(childCasReferenceId);
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
+ new Object[] { childCasReferenceId });
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("############ Controller:").append(controller.getComponentName()).
+ append(" Processing Child CAS:").append(childCasReferenceId).append(" From Parent CAS:").
+ append(parentCasReferenceId).append(" Child Count to:").append(parentCasStateEntry.getSubordinateCasInPlayCount());
+ System.out.println(sb.toString());
+ ((AggregateAnalysisEngineController) controller).process(cas, parentCasReferenceId,
+ childCasReferenceId, delegateCasMultiplier.getKey());
+
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ private void handleException(Exception e) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+ controller.getComponentName());
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ if ( !(e instanceof AsynchAEException) ) {
+ e = new AsynchAEException(e);
+ }
+ controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+
+ e.printStackTrace();
+
+ }
+ private void executeRemoteRequest(String childCasReferenceId, String parentCasReferenceId)
+ throws AsynchAEException {
+ SerializationResult result = null;
+ CacheEntry childCacheEntry = null;
+ long inTime = System.nanoTime();
+
+ try {
+ CasStateEntry parentCasStateEntry = saveFreeCasEndpointInParentCasStateEntry(parentCasReferenceId);
+
+ computeStats(mc, parentCasReferenceId);
+
+ mc.getEndpoint().setDestination(null);
+
+ // This CAS came in from a CAS Multiplier. Treat it differently than the
+ // input CAS. In case the Aggregate needs to send this CAS to the
+ // client, retrieve the client destination by looking up the client endpoint
+ // using input CAS reference id. CASes generated by the CAS multiplier will have
+ // the same Cas Reference id. Fetch Cache entry for the parent CAS
+ CacheEntry parentCasCacheEntry = controller.getInProcessCache().getCacheEntryForCAS(parentCasReferenceId);
+ Endpoint replyToEndpoint = parentCasCacheEntry.getMessageOrigin();
+ // The message context contains a Cas Multiplier endpoint. Since we dont want to
+ // send a generated CAS back to the CM, override with an endpoint provided by
+ // the client of
+ // this service. Client endpoint is attached to an input Cas cache entry.
+ if (replyToEndpoint != null) {
+ mc.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+ mc.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+ }
+ // create local cache entry for the child CAS
+ CasStateEntry childCasStateEntry = super.getCasStateEntry(childCasReferenceId);
+ // associate parent CAS with the child
+ childCasStateEntry.setParentCasReferenceId(parentCasReferenceId);
+ Delegate delegate = getLastDelegate(childCasStateEntry);
+ setLastDelegate(delegate, childCasStateEntry);
+
+ // If there is one thread receiving messages from Cas Multiplier increment
+ // number of child CASes of the parent CAS. If there are more threads
+ // (consumers)
+ // a special object ConcurrentMessageListener has already incremented the count.
+ // This special object enforces order of processing for CASes
+ // coming in from the Cas Multiplier.
+ if (!delegate.hasConcurrentConsumersOnReplyQueue()) {
+ parentCasStateEntry.incrementSubordinateCasInPlayCount();
+ }
+
+ boolean failed = false;
+ // Time how long we wait on Cas Pool to fetch a new CAS
+ long t1 = controller.getCpuTime();
+ Exception cachedException = null;
+ try {
+ result = deserializeChildCAS(delegate.getKey(), mc.getEndpoint(), mc);
+
+ } catch (Exception e) {
+ failed = true;
+ cachedException = e;
+ } finally {
+ // create child CAS cache entry
+ childCacheEntry = controller.getInProcessCache().register(result.getCas(), mc,
+ result.getDeserSharedData(), result.getReuseInfo(), childCasReferenceId, result.getMarker(),
+ result.acceptsDeltaCas());
+ childCacheEntry.setInputCasReferenceId(parentCasReferenceId);
+ childCacheEntry.setFreeCasEndpoint(parentCasCacheEntry.getFreeCasEndpoint());
+ saveStats(childCacheEntry, inTime, t1, result.getTimeWaitingForCAS());
+ /*
+ * ********************************************************* Throw an exception
+ * if the deserialization above failed
+ * *********************************************************
+ */
+ if (failed) {
+ childCacheEntry.setFailed(true);
+ throw cachedException;
+ } else {
+ // *************************************************************************
+ // Check and set up for Delta CAS reply
+ // *************************************************************************
+ boolean acceptsDeltaCas = false;
+ Marker marker = null;
+ if (mc.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
+ acceptsDeltaCas = mc.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+ if (acceptsDeltaCas) {
+ marker = result.getCas().createMarker();
+ result.setAcceptsDeltaCas(acceptsDeltaCas);
+ result.setMarker(marker);
+ }
+ }
+ }
+ if (controller.isStopped()) {
+ // The Controller is in shutdown state, release the CAS
+ controller.dropCAS(childCacheEntry.getCasReferenceId(), true);
+ return;
+ }
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "executeRemoteRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
+ new Object[] { mc.getEndpoint().getEndpoint() });
+ }
+
+ ((AggregateAnalysisEngineController) controller).process(result.getCas(), parentCasReferenceId,
+ childCasReferenceId, delegate.getKey());
+
+ }
+ } catch (Exception e) {
+ super.handleError(e, childCacheEntry, mc);
+ }
+
+ }
+ private void setLastDelegate(Delegate delegate, CasStateEntry childCasStateEntry) {
+ // Save the last delegate handling this CAS
+ childCasStateEntry.setLastDelegate(delegate);
+ controller.getInProcessCache().setCasProducer(childCasStateEntry.getCasReferenceId(), delegate.getKey());
+
+ }
+ private Delegate getLastDelegate(CasStateEntry childCasStateEntry) throws AsynchAEException {
+ String cmEndpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ String newCASProducedBy = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(cmEndpointName);
+ Delegate delegate = ((AggregateAnalysisEngineController) controller).lookupDelegate(newCASProducedBy);
+
+ return delegate;
+ }
+
+ private CasStateEntry saveFreeCasEndpointInParentCasStateEntry(String parentCasReferenceId)
+ throws AsynchAEException {
+ // Fetch the name of the Cas Multiplier's input queue
+ // String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
+ String cmEndpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ String newCASProducedBy = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(cmEndpointName);
+ Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) controller)
+ .lookUpEndpoint(newCASProducedBy, false);
+ Endpoint freeCasEndpoint = mc.getEndpoint();
+ // Clone an endpoint where Free Cas Request will be sent
+ freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();
+
+ if (casMultiplierEndpoint != null) {
+ // Save the URL of the broker managing the Free Cas Notification queue.
+ // This is needed when we try to establish a connection to the broker.
+ freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
+ }
+ CasStateEntry parentCasStateEntry = ((AggregateAnalysisEngineController) controller).getLocalCache()
+ .lookupEntry(parentCasReferenceId);
+
+ // Associate Free Cas Notification Endpoint with an input Cas
+ parentCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
+ return parentCasStateEntry;
+ }
+
+ protected void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException {
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+ long departureTime = controller.getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
+ long currentTime = System.nanoTime();
+ long roundTrip = currentTime - departureTime;
+ long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
+ new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),
+ (double) roundTrip / (double) 1000000 });
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
+ new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000,
+ aMessageContext.getEndpoint() });
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
+ new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000,
+ aMessageContext.getEndpoint() });
+ }
+ }
+ aggregateDelegateStats(aMessageContext, aCasReferenceId);
+
+ }
+
+ protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId)
+ throws AsynchAEException {
+ String delegateKey = "";
+ try {
+
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ if (entry == null) {
+ throw new AsynchAEException("CasReferenceId:" + aCasReferenceId + " Not Found in the Cache.");
+ }
+ CacheEntry inputCasEntry = null;
+ String inputCasReferenceId = entry.getInputCasReferenceId();
+ ServicePerformance casStats = ((AggregateAnalysisEngineController) controller)
+ .getCasStatistics(aCasReferenceId);
+ if (inputCasReferenceId != null && controller.getInProcessCache().entryExists(inputCasReferenceId)) {
+ String casProducerKey = entry.getCasProducerKey();
+ if (casProducerKey != null
+ && ((AggregateAnalysisEngineController) controller).isDelegateKeyValid(casProducerKey)) {
+ // Get entry for the input CAS
+ inputCasEntry = controller.getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
+ }
+
+ }
+ ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
+ .getServicePerformance(delegateKey);
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+ long timeToSerializeCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+ if (timeToSerializeCAS > 0) {
+ if (delegateServicePerformance != null) {
+ delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+ long timeToDeserializeCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+ if (timeToDeserializeCAS > 0) {
+ if (delegateServicePerformance != null) {
+ delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
+ long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+ if (idleTime > 0 && delegateServicePerformance != null) {
+ Endpoint endp = aMessageContext.getEndpoint();
+ if (endp != null && endp.isRemote()) {
+ delegateServicePerformance.incrementIdleTime(idleTime);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+ long timeWaitingForCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+ if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+ entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ delegateServicePerformance.incrementCasPoolWaitTime(
+ timeWaitingForCAS - delegateServicePerformance.getRawCasPoolWaitTime());
+ if (inputCasEntry != null) {
+ inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+ long timeInProcessCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+ Endpoint endp = aMessageContext.getEndpoint();
+ if (endp != null && endp.isRemote()) {
+ if (delegateServicePerformance != null) {
+ // calculate the time spent in analysis. The remote service returns total time
+ // spent in the analysis. Compute the delta.
+ long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
+ // increment total time in analysis
+ delegateServicePerformance.incrementAnalysisTime(dt);
+ controller.getServicePerformance().incrementAnalysisTime(dt);
+ }
+ } else {
+ controller.getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+ }
+ casStats.incrementAnalysisTime(timeInProcessCAS);
+
+ if (inputCasReferenceId != null) {
+ ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) controller)
+ .getCasStatistics(inputCasReferenceId);
+ // Update processing time for this CAS
+ if (inputCasStats != null) {
+ inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+ }
+ }
+
+ }
+ } catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.util.Level;
+
+public class ProcessChildCasResponseCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public ProcessChildCasResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+ public void execute() throws Exception {
+// System.out.println(">>>>>>>>>>>>>>> in ProcessChildCasResponseCommand.execute(");
+// int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+// String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
+// System.out.println(">>>>>>>>>>>>>>> Controller:"+controller.getComponentName()+
+// " in ProcessChildCasResponseCommand.execute() - Child CAS:"+casReferenceId+
+// " from "+mc
+// .getMessageStringProperty(AsynchAEMessage.MessageFrom)
+// );
+//
+// if (casReferenceId == null) {
+// // LOG THIS
+// System.out.println(
+// "ProcessInputCasResponseCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+// return; // Nothing to do
+// }
+// // Save Process command in the client endpoint.
+// Endpoint clientEndpoint = controller.getClientEndpoint();
+// if (clientEndpoint != null) {
+// clientEndpoint.setCommand(AsynchAEMessage.Process);
+// }
+//
+// // Cas was passed by reference meaning InProcessCache must have it
+// if (AsynchAEMessage.CASRefID == payload) {
+// executeDirectRequest(casReferenceId);
+// } else {
+// executeRemoteRequest(casReferenceId);
+// }
+ }
+ private void executeDirectRequest(String casReferenceId) throws AsynchAEException {
+ CacheEntry cacheEntry = null;
+
+ try {
+ casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ cacheEntry = controller.getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ CasStateEntry casStateEntry = ((AggregateAnalysisEngineController)controller)
+ .getLocalCache().lookupEntry(casReferenceId);
+
+ CAS cas = cacheEntry.getCas();
+ String delegateKey = null;
+ if ( mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length()==0) {
+ String fromEndpoint = mc
+ .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(fromEndpoint);
+ } else {
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+ }
+ Delegate delegate = ((AggregateAnalysisEngineController) controller)
+ .lookupDelegate(delegateKey);
+ if (casStateEntry != null) {
+ casStateEntry.setReplyReceived();
+ casStateEntry.setLastDelegate(delegate);
+ }
+ delegate.removeCasFromOutstandingList(casReferenceId);
+
+ if (cas != null) {
+ cancelTimerAndProcess(mc, casReferenceId, cas);
+ } else {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(
+ Level.INFO,
+ getClass().getName(),
+ "handleProcessResponseWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_cas_not_in_cache__INFO",
+ new Object[] { controller.getName(), casReferenceId,
+ mc.getEndpoint().getEndpoint() });
+ }
+ throw new AsynchAEException("CAS with Reference Id:" + casReferenceId
+ + " Not Found in CasManager's CAS Cache");
+ }
+ } catch (Exception e) {
+
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
+ errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+ controller.getErrorHandlerChain().handle(e, errorContext, controller);
+ } finally {
+ incrementDelegateProcessCount(mc);
+ if (controller instanceof AggregateAnalysisEngineController) {
+ try {
+ String endpointName = mc.getEndpoint().getEndpoint();
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpointName);
+ if (delegateKey != null) {
+ Endpoint endpoint = ((AggregateAnalysisEngineController) controller)
+ .lookUpEndpoint(delegateKey, false);
+
+ // Check if the multiplier aborted during processing of this input CAS
+ if (endpoint != null && endpoint.isCasMultiplier() && cacheEntry.isAborted()) {
+ if (!controller.getInProcessCache().isEmpty()) {
+ controller.getInProcessCache().registerCallbackWhenCacheEmpty(
+ controller.getEventListener());
+ } else {
+ // Callback to notify that the cache is empty
+
+ // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
+ // !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+// getController().getEventListener().onCacheEmpty();
+ }
+ }
+
+ }
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(),
+ "handleProcessResponseWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", controller.getComponentName());
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(),
+ "handleProcessResponseWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+ }
+
+ }
+ private void incrementDelegateProcessCount(MessageContext aMessageContext) {
+ Endpoint endpoint = aMessageContext.getEndpoint();
+ if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
+ try {
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpoint.getEndpoint());
+ LongNumericStatistic stat = controller.getMonitor().getLongNumericStatistic(
+ delegateKey, Monitor.ProcessCount);
+ stat.increment();
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+ "incrementDelegateProcessCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_delegate_key_for_endpoint_not_found__INFO", new Object[] { controller.getComponentName(), endpoint.getEndpoint() });
+ }
+ }
+ }
+
+ }
+ private void executeRemoteRequest(String casReferenceId) throws AsynchAEException {
+
+ }
+ private void cancelTimerAndProcess(MessageContext aMessageContext, String aCasReferenceId,
+ CAS aCAS) throws AsynchAEException {
+// computeStats(aMessageContext, aCasReferenceId);
+// ((AggregateAnalysisEngineController) controller).process(aCAS, anInputCasReferenceId,
+// aNewCasReferenceId, aNewCasProducedBy);
+//
+// super.invokeProcess(aCAS, aCasReferenceId, null, aMessageContext, null);
+
+ }
+ protected void computeStats(MessageContext aMessageContext, String aCasReferenceId)
+ throws AsynchAEException {
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+ long departureTime = controller.getTime(aCasReferenceId,
+ aMessageContext.getEndpoint().getEndpoint());
+ long currentTime = System.nanoTime();
+ long roundTrip = currentTime - departureTime;
+ long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(
+ Level.FINE,
+ getClass().getName(),
+ "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_show_roundtrip_time__FINE",
+ new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),
+ (double) roundTrip / (double) 1000000 });
+
+ UIMAFramework.getLogger(getClass()).logrb(
+ Level.FINE,
+ getClass().getName(),
+ "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_show_time_spent_in_delegate__FINE",
+ new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000,
+ aMessageContext.getEndpoint() });
+
+ UIMAFramework.getLogger(getClass()).logrb(
+ Level.FINE,
+ getClass().getName(),
+ "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_show_time_spent_in_comms__FINE",
+ new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000,
+ aMessageContext.getEndpoint() });
+ }
+ }
+
+ if (controller instanceof AggregateAnalysisEngineController) {
+ aggregateDelegateStats(aMessageContext, aCasReferenceId);
+ }
+ }
+ protected synchronized void aggregateDelegateStats(MessageContext aMessageContext,
+ String aCasReferenceId) throws AsynchAEException {
+ String delegateKey = "";
+ try {
+
+
+ if ( aMessageContext.getEndpoint().getEndpoint() == null || aMessageContext.getEndpoint().getEndpoint().trim().length()==0) {
+ String fromEndpoint = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(fromEndpoint);
+ } else {
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ }
+ // delegateKey = ((AggregateAnalysisEngineController) getController())
+ // .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ CacheEntry entry = controller.getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ if (entry == null) {
+ throw new AsynchAEException("CasReferenceId:" + aCasReferenceId
+ + " Not Found in the Cache.");
+ }
+ CacheEntry inputCasEntry = null;
+ String inputCasReferenceId = entry.getInputCasReferenceId();
+ ServicePerformance casStats = ((AggregateAnalysisEngineController) controller)
+ .getCasStatistics(aCasReferenceId);
+ if (inputCasReferenceId != null
+ && controller.getInProcessCache().entryExists(inputCasReferenceId)) {
+ String casProducerKey = entry.getCasProducerKey();
+ if (casProducerKey != null
+ && ((AggregateAnalysisEngineController) controller)
+ .isDelegateKeyValid(casProducerKey)) {
+ // Get entry for the input CAS
+ inputCasEntry = controller.getInProcessCache().getCacheEntryForCAS(
+ inputCasReferenceId);
+ }
+
+ }
+ ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
+ .getServicePerformance(delegateKey);
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+ long timeToSerializeCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+ if (timeToSerializeCAS > 0) {
+ if (delegateServicePerformance != null) {
+ delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+ long timeToDeserializeCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+ if (timeToDeserializeCAS > 0) {
+ if (delegateServicePerformance != null) {
+ delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
+ long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime))
+ .longValue();
+ if (idleTime > 0 && delegateServicePerformance != null) {
+ Endpoint endp = aMessageContext.getEndpoint();
+ if (endp != null && endp.isRemote()) {
+ delegateServicePerformance.incrementIdleTime(idleTime);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+ long timeWaitingForCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+ if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+ entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ delegateServicePerformance.incrementCasPoolWaitTime(timeWaitingForCAS
+ - delegateServicePerformance.getRawCasPoolWaitTime());
+ if (inputCasEntry != null) {
+ inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+ long timeInProcessCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+ Endpoint endp = aMessageContext.getEndpoint();
+ if (endp != null && endp.isRemote()) {
+ if (delegateServicePerformance != null) {
+ // calculate the time spent in analysis. The remote service returns total time
+ // spent in the analysis. Compute the delta.
+ long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
+ // increment total time in analysis
+ delegateServicePerformance.incrementAnalysisTime(dt);
+ controller.getServicePerformance().incrementAnalysisTime(dt);
+ }
+ } else {
+ controller.getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+ }
+ casStats.incrementAnalysisTime(timeInProcessCAS);
+
+ if (inputCasReferenceId != null) {
+ ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) controller)
+ .getCasStatistics(inputCasReferenceId);
+ // Update processing time for this CAS
+ if (inputCasStats != null) {
+ inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+ }
+ }
+
+ }
+ } catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.InProcessCache.UndefinedCacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.util.Level;
+
+/**
+ * Handles input CAS process request
+ *
+ */
+public class ProcessInputCasRequestCommand extends AbstractUimaAsCommand {
+ private static final Class<?> CLASS_NAME = ProcessInputCasRequestCommand.class;
+
+ private MessageContext mc;
+ // controls access to Aggregates semaphore which
+ // throttles processing of CASes from a service input queue
+ private Object lock = new Object();
+
+ public ProcessInputCasRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+
+ public void execute() throws Exception {
+
+ int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+ String inputCasReferenceId = super.getCasReferenceId(this.getClass(), mc);
+ if (inputCasReferenceId == null) {
+ // LOG THIS
+ System.out.println(
+ "ProcessInputCasRequestCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+ return; // Nothing to do
+ }
+ // Save Process command in the client endpoint.
+ Endpoint clientEndpoint = controller.getClientEndpoint();
+ if (clientEndpoint != null) {
+ clientEndpoint.setCommand(AsynchAEMessage.Process);
+ }
+ System.out.println("Controller:"+controller.getComponentName()+
+ " ProcessInputCasRequestCommand.execute()");
+ // CAS was passed by reference meaning the global InProcessCache must have it. The client
+ // created the entry for it already.
+ if (AsynchAEMessage.CASRefID == payload) {
+ executeDirectRequest(inputCasReferenceId);
+ } else {
+ executeRemoteRequest(inputCasReferenceId);
+ }
+ }
+
+ private void blockIfControllerNotReady(CacheEntry casCacheEntry) throws AsynchAEException {
+ if (!casCacheEntry.isWarmUp()) {
+ controller.getControllerLatch().waitUntilInitialized();
+ }
+
+ }
+
+ private void saveReplyTo() throws AsynchAEException {
+ // !!!!!!!!!!!!!!!!! HACK !!!!!!!!!!!!!!!!!!!
+ // Save true replyTo endpoint to the service sending the request
+ // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ Object replyTo = mc.getMessageObjectProperty(AsynchAEMessage.ReplyToEndpoint);
+ mc.getEndpoint().setReplyDestination(replyTo);
+
+ }
+ private void saveDelegateKey() throws AsynchAEException{
+ String delegateKey = mc.getMessageStringProperty(AsynchAEMessage.DelegateKey);
+ if ( delegateKey == null ) {
+ delegateKey = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ }
+ mc.getEndpoint().setDelegateKey(delegateKey);
+
+ }
+ private void saveEndpointName() throws AsynchAEException {
+ String endpointName = mc.getMessageStringProperty(AsynchAEMessage.EndpointName);
+ if (endpointName == null ) {
+ endpointName = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ }
+ mc.getEndpoint().setEndpoint(endpointName);
+
+ }
+ private void addMessageOrigin(CacheEntry inputCasCacheEntry) {
+ if (!controller.isPrimitive()) {
+ ((AggregateAnalysisEngineController) controller).addMessageOrigin(inputCasCacheEntry.getCasReferenceId(), mc.getEndpoint());
+ }
+
+ }
+ private void executeDirectRequest(String inputCasReferenceId) throws AsynchAEException {
+ try {
+ // this is a request from a colocated client, the InProcessCache
+ // must have a entry for a given casReferenceId
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
+ new Object[] { inputCasReferenceId });
+ }
+ CacheEntry inputCasCacheEntry = super.getCacheEntryForCas(inputCasReferenceId);
+ if (inputCasCacheEntry instanceof UndefinedCacheEntry) {
+ // LOG THIS
+ System.out.println("Controller:"+controller.getComponentName()+
+ " ProcessInputCasRequestCommand.execute() - Cache Entry for CasReferenceId:"
+ + inputCasReferenceId + " Not Found in InProcessCache - Ignoring Message");
+ return; // Nothing to do
+ }
+
+
+ blockIfControllerNotReady(inputCasCacheEntry);
+ long arrivalTime = System.nanoTime();
+ controller.saveTime(arrivalTime, inputCasCacheEntry.getCasReferenceId(), controller.getName());
+
+ saveReplyTo();
+ saveDelegateKey();
+ saveEndpointName();
+ addMessageOrigin(inputCasCacheEntry);
+
+ // Create a CasStateEntry in a local cache
+ CasStateEntry localStateEntry = getCasStateEntry(inputCasCacheEntry.getCasReferenceId());
+ // associate client endpoint with the input CAS. We need to reply to this client
+ localStateEntry.setClientEndpoint(mc.getEndpoint());
+ localStateEntry.setInputCasReferenceId(inputCasCacheEntry.getCasReferenceId());
+
+
+ if (controller.isStopped()) {
+ return;
+ }
+ // Use empty string as key. Top level component stats are stored under this key.
+ controller.getMonitor().incrementCount("", Monitor.ProcessCount);
+ // *****************************************************************
+ // Process the CAS
+ // *****************************************************************
+ process(inputCasCacheEntry, false);
+
+ } catch (AsynchAEException e) {
+ controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+ e.printStackTrace();
+ } catch (Exception e) {
+// if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+// UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+// UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+// controller.getComponentName());
+//
+// UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+// UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+// }
+// if ( !(e instanceof AsynchAEException) ) {
+// e = new AsynchAEException(e);
+// }
+ e = new AsynchAEException(e);
+ controller.getErrorHandlerChain().handle(e, super.populateErrorContext(mc), controller);
+
+ e.printStackTrace();
+ } finally {
+ // Increment number of CASes processed by this service
+ controller.getServicePerformance().incrementNumberOfCASesProcessed();
+ }
+
+ }
+
+ private void executeRemoteRequest(String inputCasReferenceId) throws AsynchAEException {
+
+ CacheEntry inputCasCacheEntry = null;
+
+ try {
+ // The following applies to input CAS from remote clients only.
+ // To prevent processing multiple messages with the same CasReferenceId, check
+ // the CAS cache to see if the message with a given CasReferenceId is already
+ // being processed. If it is, the message contains the same request possibly
+ // issued by the caller due to a timeout. Also this mechanism helps with
+ // dealing with scenario when this service is not up when the client sends
+ // request. The client can keep re-sending the same request until its timeout
+ // thresholds are exceeded. By that time, there may be multiple messages in
+ // this service queue with the same CasReferenceId. When the service finally
+ // comes back up, it will have multiple messages in its queue possibly from
+ // the same client. Only the first message for any given CasReferenceId
+ // should be processed.
+ if (isCasInTheCacheAlready(inputCasReferenceId)) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "executeRemoteRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
+ new Object[] {inputCasReferenceId });
+ }
+
+ System.out.println(
+ "ProcessInputCasRequestCommand.executeRemoteRequest() - Duplicate request recv'd for CasReferenceId:"
+ + inputCasReferenceId + " - Ignoring Message");
+ return; // Nothing to do
+ }
+ long inTime = System.nanoTime();
+
+ SerializationResult result = super.deserializeInputCAS(mc);
+
+ // Time how long we wait on Cas Pool to fetch a new CAS
+ long t1 = controller.getCpuTime();
+
+ inputCasCacheEntry = controller.getInProcessCache().register(result.getCas(), mc, result.getDeserSharedData(),
+ result.getReuseInfo(), inputCasReferenceId, result.getMarker(), result.acceptsDeltaCas());
+
+ saveStats(inputCasCacheEntry, inTime, t1, result.getTimeWaitingForCAS());
+
+ boolean waitForCompletion = false;
+
+ // create an entry for the CAS in a local cache
+ CasStateEntry cse = getCasStateEntry(inputCasReferenceId);
+
+ if (!controller.isPrimitive()) {
+ addCompletionSemaphore(inputCasCacheEntry);
+ if (cse != null && !cse.isSubordinate()) {
+ waitForCompletion = true;
+ }
+ }
+
+ addMessageOrigin(inputCasCacheEntry);
+
+ if (controller.isStopped() ) {
+ controller.dropCAS(inputCasCacheEntry.getCasReferenceId(), true);
+ return;
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "executeRemoteRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
+ new Object[] { mc.getEndpoint().getEndpoint() });
+ }
+ process(inputCasCacheEntry, waitForCompletion);
+
+ } catch (Exception e) {
+ super.handleError(e, inputCasCacheEntry, mc);
+ }
+
+ }
+
+ private void process(CacheEntry entry, boolean waitForCompletion) {
+ controller.getMonitor().incrementCount("", Monitor.ProcessCount);
+
+ // *****************************************************************
+ // Process the CAS
+ // *****************************************************************
+ if (controller.isPrimitive()) {
+ controller.process(entry.getCas(), entry.getCasReferenceId(), mc.getEndpoint());
+ } else {
+ controller.process(entry.getCas(), entry.getCasReferenceId());
+
+ /**
+ * Below comments apply to UIMA AS aggregate only. CAS has been handed off to a
+ * delegate. Now block the receiving thread until the CAS is processed or there
+ * is a timeout or error. Fetch this thread's ThreadLocal semaphore to block the
+ * thread. It will be unblocked when the aggregate is done with the CAS.
+ */
+
+ if (waitForCompletion) {
+ waitForCompletionSemaphore(entry);
+ }
+ }
+ }
+
+ private void waitForCompletionSemaphore(CacheEntry entry) {
+ try {
+ synchronized (lock) {
+ while (!controller.isStopped()) {
+ if (entry.getThreadCompletionSemaphore() != null) {
+ boolean gotIt = entry.getThreadCompletionSemaphore().tryAcquire(500, TimeUnit.MILLISECONDS);
+ if (gotIt) {
+ break;
+ }
+ } else {
+ break;
+ }
+
+ }
+ }
+ } catch (InterruptedException ex) {
+ }
+
+ }
+
+
+
+ private void addCompletionSemaphore(CacheEntry entry) {
+
+ synchronized (lock) {
+ // lazily create a Semaphore on the first Process request. This semaphore
+ // will throttle ingestion of CASes from service input queue.
+ if (((AggregateAnalysisEngineController_impl) controller).semaphore == null) {
+ ((AggregateAnalysisEngineController_impl) controller).semaphore = new Semaphore(
+ ((AggregateAnalysisEngineController) controller).getServiceCasPoolSize() - 1);
+ }
+ }
+ entry.setThreadCompletionSemaphore(((AggregateAnalysisEngineController_impl) controller).semaphore);
+ }
+
+ private boolean isCasInTheCacheAlready(String casReferenceId) {
+ // the InProcessCache lookup either returns UndefindCacheEntry or CacheEntry
+ // instance
+ CacheEntry casCacheEntry = super.getCacheEntryForCas(casReferenceId);
+ if (!(casCacheEntry instanceof UndefinedCacheEntry)) {
+ // LOG THIS
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "executeRemoteRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
+ new Object[] { casReferenceId });
+ }
+
+ System.out.println(
+ "ProcessInputCasRequestCommand.executeRemoteRequest() - Duplicate request recv'd for CasReferenceId:"
+ + casReferenceId + " - Ignoring Message");
+ return true;
+ }
+ return false;
+ }
+}