You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2018/02/26 18:54:13 UTC
svn commit: r1825401 [10/11] - in /uima/uima-as/branches/uima-as-3:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/
uimaj-as-activemq/src/main/java/org/apache/uim...
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
+import org.apache.uima.util.Level;
+
+public class ProcessInputCasResponseCommand extends AbstractUimaAsCommand {
+
+ private MessageContext mc;
+
+ public ProcessInputCasResponseCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+
+ public void execute() throws Exception {
+
+ int payload = mc.getMessageIntProperty(AsynchAEMessage.Payload);
+ String casReferenceId = super.getCasReferenceId(this.getClass(), mc);
+ System.out.println(">>>>>>>>>>>>>>> Controller:" + controller.getComponentName()
+ + " in ProcessInputCasResponseCommand.execute() - Input CAS:" + casReferenceId + " from "
+ + mc.getMessageStringProperty(AsynchAEMessage.MessageFrom));
+
+ if (casReferenceId == null) {
+ // LOG THIS
+ System.out.println(
+ "ProcessInputCasResponseCommand.execute() - CasReferenceId is missing in MessageContext - Ignoring Message");
+ return; // Nothing to do
+ }
+ // Cas was passed by reference meaning InProcessCache must have it
+ if (AsynchAEMessage.CASRefID == payload) {
+ executeDirectRequest(casReferenceId);
+ } else {
+ executeRemoteRequest(casReferenceId);
+ }
+
+ }
+
+ /**
+ * Handles process CAS reply from a delegate colocated in the same process
+ *
+ * @param casReferenceId
+ * @throws AsynchAEException
+ */
+ private void executeDirectRequest(String casReferenceId) throws AsynchAEException {
+ CacheEntry cacheEntry = null;
+ Delegate delegate = null;
+ try {
+ casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ // find an entry for a given cas id in a global cache
+ cacheEntry = super.getCacheEntryForCas(casReferenceId);
+ if (cacheEntry == null) {
+ throw new AsynchAEException("CasReferenceId:" + casReferenceId + " Not Found in the Cache.");
+ }
+
+ // find an entry for a given cas id in this aggregate's local cache
+ CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
+ // find delegate which sent the reply
+ delegate = super.getDelegate(mc);
+ if (casStateEntry != null) {
+ casStateEntry.setReplyReceived();
+ casStateEntry.setLastDelegate(delegate);
+ }
+ // each delegate object manages a list of CAS ids that were
+ // dispatched. Every time a CAS reply is received, we remove
+ // its CAS id from the outstanding list.
+ delegate.removeCasFromOutstandingList(casReferenceId);
+
+ if (cacheEntry.getCas() != null) {
+ computeStats(mc, cacheEntry);
+ ((AggregateAnalysisEngineController) controller).process(cacheEntry.getCas(), casReferenceId);
+
+ } else {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_not_in_cache__INFO",
+ new Object[] { controller.getName(), casReferenceId, mc.getEndpoint().getEndpoint() });
+ }
+ throw new AsynchAEException(
+ "CAS with Reference Id:" + casReferenceId + " Not Found in CasManager's CAS Cache");
+ }
+ } catch (Exception e) {
+ super.handleError(e, cacheEntry, mc);
+ } finally {
+ incrementDelegateProcessCount(mc);
+ if (delegate != null) {
+ handleAbortedCasMultiplier(delegate, cacheEntry);
+ }
+ }
+
+ }
+
+ private void blockIfControllerNotReady(CacheEntry casCacheEntry) throws AsynchAEException {
+ if (!casCacheEntry.isWarmUp()) {
+ controller.getControllerLatch().waitUntilInitialized();
+ }
+
+ }
+
+ private void executeRemoteRequest(String casReferenceId) {
+
+ }
+
+ private void handleAbortedCasMultiplier(Delegate delegate, CacheEntry cacheEntry) {
+ try {
+ // Check if the multiplier aborted during processing of this input CAS
+ if (delegate.getEndpoint() != null && delegate.getEndpoint().isCasMultiplier() && cacheEntry.isAborted()) {
+ if (!((AggregateAnalysisEngineController) controller).getInProcessCache().isEmpty()) {
+ ((AggregateAnalysisEngineController) controller).getInProcessCache()
+ .registerCallbackWhenCacheEmpty(controller.getEventListener());
+ } else {
+ // Callback to notify that the cache is empty
+
+ // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED?
+ // !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+ // getController().getEventListener().onCacheEmpty();
+ }
+ }
+
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+ controller.getComponentName());
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "executeDirectRequest",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+
+ private void incrementDelegateProcessCount(MessageContext aMessageContext) {
+ Endpoint endpoint = aMessageContext.getEndpoint();
+ if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
+ try {
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpoint.getEndpoint());
+ LongNumericStatistic stat = controller.getMonitor().getLongNumericStatistic(delegateKey,
+ Monitor.ProcessCount);
+ stat.increment();
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(),
+ "incrementDelegateProcessCount", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_delegate_key_for_endpoint_not_found__INFO",
+ new Object[] { controller.getComponentName(), endpoint.getEndpoint() });
+ }
+ }
+ }
+
+ }
+
+ protected void computeStats(MessageContext aMessageContext, CacheEntry cacheEntry) throws AsynchAEException {
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService)) {
+ long departureTime = controller.getTime(cacheEntry.getCasReferenceId(),
+ aMessageContext.getEndpoint().getEndpoint());
+ long currentTime = System.nanoTime();
+ long roundTrip = currentTime - departureTime;
+ long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+ long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
+ new Object[] { cacheEntry.getCasReferenceId(), aMessageContext.getEndpoint(),
+ (double) roundTrip / (double) 1000000 });
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
+ new Object[] { cacheEntry.getCasReferenceId(), (double) timeInService / (double) 1000000,
+ aMessageContext.getEndpoint() });
+
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(), "computeStats",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
+ new Object[] { cacheEntry.getCasReferenceId(), (double) totalTimeInComms / (double) 1000000,
+ aMessageContext.getEndpoint() });
+ }
+ }
+
+ if (controller instanceof AggregateAnalysisEngineController) {
+ aggregateDelegateStats(aMessageContext, cacheEntry);
+ }
+ }
+
+ protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, CacheEntry cacheEntry)
+ throws AsynchAEException {
+ String delegateKey = "";
+ try {
+
+ if (aMessageContext.getEndpoint().getEndpoint() == null
+ || aMessageContext.getEndpoint().getEndpoint().trim().length() == 0) {
+ String fromEndpoint = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ delegateKey = ((AggregateAnalysisEngineController) controller).lookUpDelegateKey(fromEndpoint);
+ } else {
+ delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ }
+ CacheEntry parentCasEntry = null;
+ String parentCasReferenceId = cacheEntry.getInputCasReferenceId();
+ ServicePerformance casStats = ((AggregateAnalysisEngineController) controller)
+ .getCasStatistics(cacheEntry.getCasReferenceId());
+ if (parentCasReferenceId != null && controller.getInProcessCache().entryExists(parentCasReferenceId)) {
+ String casProducerKey = cacheEntry.getCasProducerKey();
+ if (casProducerKey != null
+ && ((AggregateAnalysisEngineController) controller).isDelegateKeyValid(casProducerKey)) {
+ // Get entry for the parent CAS
+ parentCasEntry = controller.getInProcessCache().getCacheEntryForCAS(parentCasReferenceId);
+ }
+
+ }
+ ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) controller)
+ .getServicePerformance(delegateKey);
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
+ long timeToSerializeCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+ if (timeToSerializeCAS > 0) {
+ if (delegateServicePerformance != null) {
+ delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
+ long timeToDeserializeCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+ if (timeToDeserializeCAS > 0) {
+ if (delegateServicePerformance != null) {
+ delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
+ long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+ if (idleTime > 0 && delegateServicePerformance != null) {
+ Endpoint endp = aMessageContext.getEndpoint();
+ if (endp != null && endp.isRemote()) {
+ delegateServicePerformance.incrementIdleTime(idleTime);
+ }
+ }
+ }
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
+ long timeWaitingForCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+ if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
+ cacheEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ delegateServicePerformance.incrementCasPoolWaitTime(
+ timeWaitingForCAS - delegateServicePerformance.getRawCasPoolWaitTime());
+ if (parentCasEntry != null) {
+ parentCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+ }
+ }
+ }
+ if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
+ long timeInProcessCAS = ((Long) aMessageContext
+ .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+ Endpoint endp = aMessageContext.getEndpoint();
+ if (endp != null && endp.isRemote()) {
+ if (delegateServicePerformance != null) {
+ // calculate the time spent in analysis. The remote service returns total time
+ // spent in the analysis. Compute the delta.
+ long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
+ // increment total time in analysis
+ delegateServicePerformance.incrementAnalysisTime(dt);
+ controller.getServicePerformance().incrementAnalysisTime(dt);
+ }
+ } else {
+ controller.getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+ }
+ casStats.incrementAnalysisTime(timeInProcessCAS);
+
+ if (parentCasReferenceId != null) {
+ ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) controller)
+ .getCasStatistics(parentCasReferenceId);
+ // Update processing time for this CAS
+ if (inputCasStats != null) {
+ inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+ }
+ }
+
+ }
+ } catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class ReleaseCASRequestCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public ReleaseCASRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+
+ public void execute() throws Exception {
+ String casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.FINE, getClass().getName(),
+ "execute", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_release_cas_req__FINE",
+ new Object[] { controller.getName(), casReferenceId });
+ }
+ System.out.println("???? Controller:"+controller.getComponentName()+" ReleaseCASRequestCommand.execute()- Releasing CAS:"+casReferenceId);
+ controller.releaseNextCas(casReferenceId);
+
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.util.Level;
+
+public class StopRequestCommand extends AbstractUimaAsCommand {
+ private MessageContext mc;
+
+ public StopRequestCommand(MessageContext mc, AnalysisEngineController controller) {
+ super(controller);
+ this.mc = mc;
+ }
+
+ public void execute() throws Exception {
+
+ try {
+ String casReferenceId = mc.getMessageStringProperty(AsynchAEMessage.CasReference);
+ if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_received_stop_request__INFO",
+ new Object[] { controller.getComponentName(), casReferenceId });
+ }
+
+ if (controller.isPrimitive()) {
+ controller.addAbortedCasReferenceId(casReferenceId);
+ } else {
+ CasStateEntry casStateEntry = super.getCasStateEntry(casReferenceId);
+ // Mark the CAS as if it have failed. In this case we dont associate any
+ // exceptions with this CAS so its really not a failure of a CAS or any
+ // of its children. We simply use the same logic here as if the CAS failed.
+ // The Aggregate replyToClient() method will know that this CAS was stopped
+ // as opposed to failed by the fact that the CAS has no exceptions associated
+ // with it. In such case the replyToClient() method returns an input CAS as if
+ // it has been fully processed.
+ casStateEntry.setFailed();
+ ((AggregateAnalysisEngineController_impl) controller).stopCasMultipliers();
+ }
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(this.getClass()).isLoggable(Level.WARNING)) {
+ if (controller != null) {
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(), "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
+ controller.getComponentName());
+ }
+
+ UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, getClass().getName(), "execute",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ }
+ }
+
+
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsCommand.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.command;
+
+public interface UimaAsCommand {
+ public abstract void execute() throws Exception;
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AggregateAnalysisEngineDelegate.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.uima.aae.error.ErrorHandlerChain;
+
+public class AggregateAnalysisEngineDelegate extends AnalysisEngineDelegate {
+ private ErrorHandlerChain delegateErrorHandlerChain;
+ private List<AnalysisEngineDelegate> delegates = new LinkedList<>();
+
+ public AggregateAnalysisEngineDelegate(String key) {
+ super(key);
+ setPrimitive(false);
+ }
+ public void addDelegate(AnalysisEngineDelegate delegate) {
+ delegates.add(delegate);
+ }
+ public List<AnalysisEngineDelegate> getDelegates() {
+ return delegates;
+ }
+ public ErrorHandlerChain getDelegateErrorHandlerChain() {
+ return delegateErrorHandlerChain;
+ }
+
+ public void setDelegateErrorHandlerChain(ErrorHandlerChain delegateErrorHandlerChain) {
+ this.delegateErrorHandlerChain = delegateErrorHandlerChain;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/AnalysisEngineDelegate.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+
+public class AnalysisEngineDelegate {
+ private String key;
+ private boolean remote=false;
+ private int scaleout=1;
+ private int replyScaleout=1;
+ private boolean primitive;
+ private boolean async=false;
+ private AnalysisEngineDescription resourceSpecifier=null;
+ private CasMultiplierNature cm=null;
+ private int getMetaTimeout = 0;
+ private int processTimeout=0;
+ private int cpcTimeout=0;
+
+ public AnalysisEngineDelegate() {
+ this("");
+ }
+ public AnalysisEngineDelegate(String key) {
+ setKey(key);
+ }
+ public boolean isCasMultiplier() {
+ return cm != null;
+ }
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public boolean isRemote() {
+ return remote;
+ }
+ public void setRemote(boolean remote) {
+ this.remote = remote;
+ }
+ public int getScaleout() {
+ return scaleout;
+ }
+ public void setScaleout(int scaleout) {
+ this.scaleout = scaleout;
+ }
+ public boolean isPrimitive() {
+ return primitive;
+ }
+ public void setPrimitive(boolean primitive) {
+ this.primitive = primitive;
+ }
+ public AnalysisEngineDescription getResourceSpecifier() {
+ return resourceSpecifier;
+ }
+ public void setResourceSpecifier(AnalysisEngineDescription resourceSpecifier) {
+ this.resourceSpecifier = resourceSpecifier;
+ }
+ public CasMultiplierNature getCasMultiplier() {
+ return cm;
+ }
+ public void setCm(CasMultiplierNature cm) {
+ this.cm = cm;
+ }
+ public boolean isAsync() {
+ return async;
+ }
+ public void setAsync(boolean async) {
+ this.async = async;
+ }
+ public int getGetMetaTimeout() {
+ return getMetaTimeout;
+ }
+ public void setGetMetaTimeout(int getMetaTimeout) {
+ this.getMetaTimeout = getMetaTimeout;
+ }
+ public int getProcessTimeout() {
+ return processTimeout;
+ }
+ public void setProcessTimeout(int processTimeout) {
+ this.processTimeout = processTimeout;
+ }
+ public int getCpcTimeout() {
+ return cpcTimeout;
+ }
+ public void setCpcTimeout(int cpcTimeout) {
+ this.cpcTimeout = cpcTimeout;
+ }
+ public int getReplyScaleout() {
+ return replyScaleout;
+ }
+ public void setReplyScaleout(int setReplyScaleout) {
+ this.replyScaleout = setReplyScaleout;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/CasMultiplierNature.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+public class CasMultiplierNature {
+ int poolSize = 1;
+ int initialHeapSize = 500;
+ boolean processParentLast = false;
+ boolean disableJCasCache = false;
+
+ public int getPoolSize() {
+ return poolSize;
+ }
+ public void setPoolSize(int poolSize) {
+ this.poolSize = poolSize;
+ }
+ public int getInitialHeapSize() {
+ return initialHeapSize;
+ }
+ public void setInitialHeapSize(int initialHeapSize) {
+ this.initialHeapSize = initialHeapSize;
+ }
+ public boolean isProcessParentLast() {
+ return processParentLast;
+ }
+ public void setProcessParentLast(boolean processParentLast) {
+ this.processParentLast = processParentLast;
+ }
+ public boolean disableJCasCache() {
+ return disableJCasCache;
+ }
+ public void disableJCasCache(boolean disableJCasCache) {
+ this.disableJCasCache = disableJCasCache;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/delegate/RemoteAnalysisEngineDelegate.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.service.delegate;
+
+import org.apache.uima.aae.service.builder.AbstractUimaAsServiceBuilder.Serialization;
+
+public class RemoteAnalysisEngineDelegate extends AnalysisEngineDelegate {
+ private String queueName;
+ private String brokerURI;
+ private int prefetch=0;
+ private String serialization = Serialization.XMI.toString();
+ public RemoteAnalysisEngineDelegate( String key ) {
+ super(key);
+ super.setRemote(true);
+ }
+ public String getQueueName() {
+ return queueName;
+ }
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+ public String getBrokerURI() {
+ return brokerURI;
+ }
+ public void setBrokerURI(String brokerURI) {
+ this.brokerURI = brokerURI;
+ }
+ public boolean isCollocated() {
+ return brokerURI.equalsIgnoreCase("java");
+ }
+ public String getSerialization() {
+ return serialization;
+ }
+ public void setSerialization(String serialization) {
+ this.serialization = serialization;
+ }
+ public int getPrefetch() {
+ return prefetch;
+ }
+ public void setPrefetch(int prefetch) {
+ this.prefetch = prefetch;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectInputChannel.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.handler.Handler;
+import org.apache.uima.aae.jmx.ServiceInfo;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
+import org.apache.uima.aae.service.command.CommandFactory;
+import org.apache.uima.aae.service.command.UimaAsCommand;
+
+public class DirectInputChannel implements InputChannel {
+ private static final Class<?> CLASS_NAME = DirectInputChannel.class;
+ AnalysisEngineController controller;
+ Handler handler;
+ String endpointName;
+ private ServiceInfo serviceInfo = null;
+ private boolean isStopped;
+ private List<Listener> listeners = new ArrayList<>();
+ private ENDPOINT_TYPE type = ENDPOINT_TYPE.DIRECT;
+ private ChannelType channelType;
+
+ public DirectInputChannel(ChannelType type) {
+ this.channelType = type;
+ }
+
+ public ChannelType getChannelType() {
+ return channelType;
+ }
+
+ public ENDPOINT_TYPE getType() {
+ return type;
+ }
+
+ public DirectInputChannel withController(AnalysisEngineController controller) {
+ this.controller = controller;
+ return this;
+ }
+
+ public void setController(AnalysisEngineController controller) {
+ this.controller = controller;
+ }
+
+ public void onMessage(DirectMessage message) {
+
+ try {
+ // every message is wrapped in the MessageContext
+ MessageContext mc = new DirectMessageContext(message, "", getController().getComponentName());
+ if (validMessage(mc)) {
+ UimaAsCommand cmd = CommandFactory.newCommand(mc, controller);
+ cmd.execute();
+ }
+ } catch (Exception t) {
+ t.printStackTrace();
+ }
+ }
+
+ public AnalysisEngineController getController() {
+ return controller;
+ }
+
+ /**
+ * Validate command contained in the header of the JMS Message
+ *
+ * @param aMessage
+ * - JMS Message received
+ * @param properties
+ * - Map containing header properties
+ * @return - true if the command received is a valid one, false otherwise
+ * @throws Exception
+ */
+ private boolean validCommand(MessageContext aMessage) throws Exception {
+ if (aMessage.propertyExists(AsynchAEMessage.Command)) {
+ int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+ if (command != AsynchAEMessage.Process && command != AsynchAEMessage.GetMeta
+ && command != AsynchAEMessage.ReleaseCAS && command != AsynchAEMessage.Stop
+ && command != AsynchAEMessage.Ping && command != AsynchAEMessage.ServiceInfo
+ && command != AsynchAEMessage.CollectionProcessComplete) {
+ System.out.println(CLASS_NAME + ".validCommand() - invalid command in message " + command);
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_invalid_command_in_message__INFO", new Object[] { command,
+ * endpointName }); }
+ */
+ return false;
+ }
+ } else {
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_command_notin_message__INFO", new Object[] { endpointName }); }
+ */
+ System.out.println(CLASS_NAME + ".validCommand() - command not in message");
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Validates contents of the message. It checks if command, payload and message
+ * types contain valid data.
+ *
+ * @param aMessage
+ * - JMS Message to validate
+ * @return - true if message is valid, false otherwise
+ * @throws Exception
+ */
+ public boolean validMessage(MessageContext aMessage) throws Exception {
+ if (aMessage instanceof DirectMessageContext) {
+ // Map properties = ((ActiveMQMessage) aMessage).getProperties();
+ if (!validMessageType(aMessage)) {
+ int msgType = 0;
+ if (aMessage.propertyExists(AsynchAEMessage.MessageType)) {
+ msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+ }
+ System.out.println(CLASS_NAME + ".validMessage() - invalid message type " + msgType);
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_invalid_msg_type__INFO", new Object[] {
+ * getController().getComponentName(), msgType }); }
+ */
+ return false;
+ }
+ if (!validCommand(aMessage)) {
+ int command = 0;
+ if (aMessage.propertyExists(AsynchAEMessage.Command)) {
+ command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+ }
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_invalid_cmd_type__INFO", new Object[] {
+ * getController().getComponentName(), command }); }
+ */
+ System.out.println(CLASS_NAME + ".validMessage() - invalid command in message " + command);
+
+ return false;
+ }
+ if (!validPayload(aMessage)) {
+ int payload = 0;
+ if (aMessage.propertyExists(AsynchAEMessage.Payload)) {
+ payload = aMessage.getMessageIntProperty(AsynchAEMessage.Payload);
+ }
+ System.out.println(CLASS_NAME + ".validMessage() - invalid payload type " + payload);
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_invalid_payload_type__INFO", new Object[] {
+ * getController().getComponentName(), payload }); }
+ */
+ return false;
+ }
+
+ if (isStaleMessage(aMessage)) {
+ return false;
+ }
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+ /**
+ * Validate message type contained in the JMS header.
+ *
+ * @param aMessage
+ * - jms message retrieved from queue
+ * @param properties
+ * - map containing message properties
+ * @return - true if message Type is valid, false otherwise
+ * @throws Exception
+ */
+
+ private boolean validMessageType(MessageContext aMessage) throws Exception {
+ if (aMessage.propertyExists(AsynchAEMessage.MessageType)) {
+ int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+ if (msgType != AsynchAEMessage.Response && msgType != AsynchAEMessage.Request) {
+ System.out.println(CLASS_NAME + ".validMessageType() - invalid message type " + msgType);
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_invalid_msgtype_in_message__INFO", new Object[] { msgType,
+ * endpointName }); }
+ */
+ return false;
+
+ }
+ } else {
+ System.out.println(CLASS_NAME + ".validMessageType() - message type not in message");
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_msgtype_notin_message__INFO", new Object[] { endpointName }); }
+ */
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean isProcessRequest(MessageContext aMessage) throws Exception {
+ if (aMessage.propertyExists(AsynchAEMessage.MessageType) && aMessage.propertyExists(AsynchAEMessage.Command)) {
+ int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+
+ if (msgType != AsynchAEMessage.Request || command != AsynchAEMessage.Process) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isRemoteRequest(MessageContext aMessage) throws Exception {
+
+ // Dont do checkpoints if a message was sent from a Cas Multiplier
+ if (aMessage.propertyExists(AsynchAEMessage.CasSequence)) {
+ return false;
+ }
+
+ if (aMessage.propertyExists(AsynchAEMessage.MessageType) && aMessage.propertyExists(AsynchAEMessage.Command)
+ && aMessage.propertyExists(UIMAMessage.ServerURI)) {
+ int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+ boolean isRemote = aMessage.getMessageStringProperty(UIMAMessage.ServerURI).startsWith("vm") == false;
+ if (isRemote && msgType == AsynchAEMessage.Request
+ && (command == AsynchAEMessage.Process || command == AsynchAEMessage.CollectionProcessComplete)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean acceptsDeltaCas(MessageContext aMessage) throws Exception {
+ boolean acceptsDeltaCas = false;
+ if (aMessage.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
+ acceptsDeltaCas = aMessage.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+ }
+ return acceptsDeltaCas;
+ }
+
+ public void abort() {
+ }
+
+ private String decodeIntToString(String aTypeToDecode, int aValueToDecode) {
+ if (AsynchAEMessage.MessageType.equals(aTypeToDecode)) {
+ switch (aValueToDecode) {
+ case AsynchAEMessage.Request:
+ return "Request";
+ case AsynchAEMessage.Response:
+ return "Response";
+ }
+ } else if (AsynchAEMessage.Command.equals(aTypeToDecode)) {
+ switch (aValueToDecode) {
+ case AsynchAEMessage.Process:
+ return "Process";
+ case AsynchAEMessage.GetMeta:
+ return "GetMetadata";
+ case AsynchAEMessage.CollectionProcessComplete:
+ return "CollectionProcessComplete";
+ case AsynchAEMessage.ReleaseCAS:
+ return "ReleaseCAS";
+ case AsynchAEMessage.Stop:
+ return "Stop";
+ case AsynchAEMessage.Ping:
+ return "Ping";
+ case AsynchAEMessage.ServiceInfo:
+ return "ServiceInfo";
+ }
+
+ } else if (AsynchAEMessage.Payload.equals(aTypeToDecode)) {
+ switch (aValueToDecode) {
+ case AsynchAEMessage.XMIPayload:
+ return "XMIPayload";
+ case AsynchAEMessage.BinaryPayload:
+ return "BinaryPayload";
+ case AsynchAEMessage.CASRefID:
+ return "CASRefID";
+ case AsynchAEMessage.Metadata:
+ return "Metadata";
+ case AsynchAEMessage.Exception:
+ return "Exception";
+ // 5/2013 xcas not used
+ // case AsynchAEMessage.XCASPayload:
+ // return "XCASPayload";
+ case AsynchAEMessage.None:
+ return "None";
+ }
+ }
+ return "UNKNOWN";
+ }
+
+ private boolean isStaleMessage(MessageContext aMessage) throws AsynchAEException {
+ int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+ int msgType = aMessage.getMessageIntProperty(AsynchAEMessage.MessageType);
+ if (isStopped() || getController() == null || getController().getInProcessCache() == null) {
+ // Shutting down
+ return true;
+ }
+ if (command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response) {
+ String casReferenceId = aMessage.getMessageStringProperty(AsynchAEMessage.CasReference);
+ if (!getController().getInProcessCache().entryExists(casReferenceId)) {
+
+ System.out.println(CLASS_NAME + ".isStaleMessage() - stale message rec'd - CasRefId:" + casReferenceId);
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb( Level.FINE, CLASS_NAME.getName(),
+ * "isStaleMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_stale_message__FINE", new Object[] { endpointName, casReferenceId,
+ * aMessage.getMessageStringProperty(AsynchAEMessage.MessageFrom) });
+ *
+ * }
+ */
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Validates payload in the JMS Message.
+ *
+ * @param aMessage
+ * - JMS Message received
+ * @param properties
+ * - Map containing header properties
+ * @return - true if the payload is valid, false otherwise
+ * @throws Exception
+ */
+ private boolean validPayload(MessageContext aMessage) throws Exception {
+ if (aMessage.propertyExists(AsynchAEMessage.Command)) {
+ int command = aMessage.getMessageIntProperty(AsynchAEMessage.Command);
+ if (command == AsynchAEMessage.GetMeta || command == AsynchAEMessage.CollectionProcessComplete
+ || command == AsynchAEMessage.Stop || command == AsynchAEMessage.Ping
+ || command == AsynchAEMessage.ServiceInfo || command == AsynchAEMessage.ReleaseCAS) {
+ // Payload not included in GetMeta Request
+ return true;
+ }
+ }
+
+ if (aMessage.propertyExists(AsynchAEMessage.Payload)) {
+ int payload = aMessage.getMessageIntProperty(AsynchAEMessage.Payload);
+ if (payload != AsynchAEMessage.XMIPayload && payload != AsynchAEMessage.BinaryPayload
+ && payload != AsynchAEMessage.CASRefID && payload != AsynchAEMessage.Exception
+ && payload != AsynchAEMessage.Metadata) {
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_invalid_payload_in_message__INFO", new Object[] { payload,
+ * endpointName }); }
+ */
+ System.out.println(CLASS_NAME + ".validPayload() - Invalid Payload in message:" + payload);
+ return false;
+ }
+ } else {
+ /*
+ * if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ * UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ * "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ * "UIMAJMS_payload_notin_message__INFO", new Object[] { endpointName }); }
+ */
+ System.out.println(CLASS_NAME + ".validPayload() - Payload not in message");
+ return false;
+ }
+
+ return true;
+ }
+
+ public void setMessageHandler(Handler aHandler) {
+ handler = aHandler;
+ // msgHandlerLatch.countDown();
+ }
+
+ public List<Listener> registerListener(Listener listener) {
+ listeners.add(listener);
+ return listeners;
+ }
+
+ public void setListener(DirectListener listener) {
+ listeners.add(listener);
+ }
+
+ public void setListeners(List<DirectListener> list) {
+ System.out.println("... DirectInputChannel adding listeners - howMany:" + list.size());
+ listeners.addAll(list);
+
+ }
+
+ public List<Listener> getListeners() {
+ List<Listener> ll = new ArrayList<>();
+ for (Listener l : listeners) {
+ ll.add(l);
+ }
+ return ll;
+ }
+
+ @Override
+ public void stop(boolean shutdownNow) throws Exception {
+ isStopped = true;
+ for (Listener l : listeners) {
+ l.stop();
+ }
+ }
+
+ @Override
+ public void stop(int channelsToStop, boolean shutdownNow) throws Exception {
+ // TODO Auto-generated method stub
+ for (Listener l : listeners) {
+ l.stop();
+ }
+ }
+
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getSessionAckMode() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void ackMessage(MessageContext aMessageContext) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getServerUri() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setServerUri(String aServerUri) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getInputQueueName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public ServiceInfo getServiceInfo() {
+ if (serviceInfo == null) {
+ serviceInfo = new ServiceInfo(false, controller);
+ serviceInfo.setBrokerURL("Direct");
+ serviceInfo.setInputQueueName(getName());
+ if (controller == null) {
+ serviceInfo.setState(ServiceState.INITIALIZING.name());
+ } else {
+ if (controller.isCasMultiplier()) {
+ serviceInfo.setCASMultiplier();
+ }
+ }
+ }
+ return serviceInfo;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return isStopped;
+ }
+
+ public int getConcurrentConsumerCount() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void destroyListener(String anEndpointName, String aDelegateKey) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isFailed(String aDelegateKey) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isListenerForDestination(String anEndpointName) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void removeDelegateFromFailedList(String aDelegateKey) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setTerminating() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void terminate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void disconnectListenersFromQueue() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void disconnectListenerFromQueue(Listener listener) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setEndpointName(String name) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void createListenerForTargetedMessages() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.UimaAsThreadFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.spi.transport.vm.UimaVmQueue;
+import org.springframework.core.task.TaskExecutor;
+
+public class DirectListener implements Listener, JavaQueueListener {
+ private BlockingQueue<DirectMessage> inQueue;
+ private DirectInputChannel ic;
+ private int consumerCount = 1;
+ private ExecutorService executor;
+ private AnalysisEngineController controller;
+ private int scaleout;
+ private Thread msgHandlerThread;
+ private BlockingQueue<Runnable> workQueue = null;
+ private CountDownLatch latchToCountNumberOfInitedThreads;
+ private CountDownLatch latchToCountNumberOfTerminatedThreads;
+ private Type type;
+ private Transport transport = Transport.Java;
+ //private boolean hasTaskExecutor = false;
+ private String name=null;
+ private volatile boolean started = false;
+ public DirectListener(Type t) {
+ this.type = t;
+ System.out.println("DirectListener c'tor - type:"+type.name());
+ }
+
+ public DirectListener(AnalysisEngineController ctlr, Type t, int scaleout) {
+ this.controller = ctlr;
+ this.scaleout = scaleout;
+ this.type = t;
+ System.out.println("DirectListener c'tor - type:"+type.name());
+ }
+ public Transport getTransport() {
+ return transport;
+ }
+ public BlockingQueue<DirectMessage> getEndpoint() {
+ return inQueue;
+ }
+ public Type getType() {
+ return type;
+ }
+ public void setController(AnalysisEngineController ctlr) {
+ this.controller = ctlr;
+ }
+ public DirectListener withController(AnalysisEngineController ctlr) {
+ this.controller = ctlr;
+ return this;
+ }
+ public void setConsumerThreads(int howMany) {
+ this.scaleout = howMany;
+ latchToCountNumberOfInitedThreads = new CountDownLatch(howMany);
+ latchToCountNumberOfTerminatedThreads = new CountDownLatch(howMany);
+ }
+ public DirectListener withConsumerThreads(int howMany) {
+ this.scaleout = howMany;
+ latchToCountNumberOfInitedThreads = new CountDownLatch(howMany);
+ latchToCountNumberOfTerminatedThreads = new CountDownLatch(howMany);
+
+ return this;
+ }
+ public DirectListener withName(String name) {
+ this.name = name;
+ return this;
+ }
+ public void setQueue(BlockingQueue<DirectMessage> inQueue) throws Exception {
+ this.inQueue = inQueue;
+ }
+ public DirectListener withQueue(BlockingQueue<DirectMessage> inQueue) throws Exception {
+ this.inQueue = inQueue;
+ return this;
+ }
+ public void setConsumers(int howMany) {
+ consumerCount = howMany;
+ }
+
+ public DirectListener withInputChannel(DirectInputChannel ic) {
+ this.ic = ic;
+ return this;
+ }
+ public void setInputChannel(DirectInputChannel ic) {
+ this.ic = ic;
+ }
+
+ public DirectListener initialize() throws Exception {
+ if (Type.ProcessCAS.equals(type)) {
+ workQueue = new UimaVmQueue();
+ if ( controller instanceof PrimitiveAnalysisEngineController ) {
+ ThreadGroup threadGroup = new ThreadGroup("VmThreadGroup" + 1 + "_" + controller.getComponentName());
+ executor = new ThreadPoolExecutor(scaleout, scaleout, Long.MAX_VALUE, TimeUnit.DAYS, workQueue);
+ UimaAsThreadFactory tf = null;
+ tf = new UimaAsThreadFactory().
+ withThreadGroup(threadGroup).
+ withPrimitiveController((PrimitiveAnalysisEngineController)controller).
+ withTerminatedThreadsLatch(latchToCountNumberOfTerminatedThreads).
+ withInitedThreadsLatch(latchToCountNumberOfInitedThreads);
+ tf.setDaemon(true);
+ ((ThreadPoolExecutor)executor).setThreadFactory(tf);
+ ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+ latchToCountNumberOfInitedThreads.await();
+ System.out.println("Executor Started - All Process Threads Initialized");
+ } else {
+ executor = Executors.newFixedThreadPool(consumerCount);
+ }
+ } else {
+ executor = Executors.newFixedThreadPool(consumerCount);
+ }
+ return this;
+ }
+
+ private boolean stopConsumingMessages(DirectMessage message ) {
+ return message.getAsInt(AsynchAEMessage.Command) == AsynchAEMessage.PoisonPill;
+
+ }
+ public synchronized void start() {
+ if ( started ) {
+ return;
+ }
+ System.out.println(">>> "+controller.getComponentName()+" DirectListener.start()");
+ msgHandlerThread = new Thread() {
+ public void run() {
+ started = true;
+ boolean stop = false;
+
+ while( !controller.isStopped() && !stop) {
+ try {
+
+ final DirectMessage message = inQueue.take(); //blocks if empty
+ if ( stopConsumingMessages(message)) {
+ System.out.println(">>> "+controller.getComponentName()+" Got END message - Stopping Queue Consumer");
+ stop = true;
+ } else {
+ executor.submit(new Runnable() {
+ public void run() {
+
+ try {
+ System.out.println(">>> "+controller.getComponentName()+" Got new message - processing on thread "+Thread.currentThread().getName()+" channel:"+getType());
+ ic.onMessage(message);
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ } catch( InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ System.out.println(getType()+ " Listener Thread Interrupted - Stop Listening");
+ stop = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ stop = true;
+ }
+ }
+ }
+ };
+ msgHandlerThread.start();
+ }
+ public void stop() {
+ msgHandlerThread.interrupt();
+
+ DirectMessage message = new DirectMessage().withCommand(AsynchAEMessage.PoisonPill);
+ try {
+ inQueue.put(message);
+ } catch( InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if ( executor != null ) {
+ executor.shutdown();
+
+ }
+ }
+
+ public TaskExecutor getTaskExecutor() {
+ return null;
+ }
+
+ public String getName() {
+ if ( name != null ) {
+ return name;
+ }
+ return controller.getKey();
+ }
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+
+public class DirectMessage {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, Object> stateMap =
+ new HashMap<>();
+
+ private void store(String key, Object value) {
+ stateMap.put(key, value);
+ }
+ public DirectMessage withCasReferenceId(String casReferenceId) {
+ store(AsynchAEMessage.CasReference, casReferenceId);
+ return this;
+ }
+ public DirectMessage withEndpointName(String endpointName) {
+ store(AsynchAEMessage.EndpointName, endpointName);
+ return this;
+ }
+
+ public DirectMessage withParentCasReferenceId(String parentCasReferenceId) {
+ store(AsynchAEMessage.InputCasReference, parentCasReferenceId);
+ return this;
+ }
+ public DirectMessage withSequenceNumber(long seqNo) {
+ store(AsynchAEMessage.CasSequence, seqNo);
+ return this;
+ }
+ public DirectMessage withStat(String statKey, long stat) {
+ store(statKey, stat);
+ return this;
+ }
+ public DirectMessage withError(Throwable t) {
+ store(AsynchAEMessage.ErrorCause, t);
+ return this;
+ }
+ public DirectMessage withFreeCASQueue(BlockingQueue<DirectMessage> freeCASQueue) {
+ store(AsynchAEMessage.FreeCASQueue, freeCASQueue);
+ return this;
+
+ }
+ public DirectMessage withCommand(int command) {
+ store(AsynchAEMessage.Command, command);
+ if ( command == AsynchAEMessage.Process ) {
+ if (System.getProperty("UimaAsCasTracking") != null) {
+ store("UimaAsCasTracking", "enable");
+ }
+ }
+
+ return this;
+ }
+ public DirectMessage withMessageType(int messageType) {
+ store(AsynchAEMessage.MessageType, messageType);
+ return this;
+ }
+ public DirectMessage withOrigin(String origin) {
+ store(AsynchAEMessage.MessageFrom, origin);
+ return this;
+ }
+ public DirectMessage withPayload(int payloadType) {
+ store(AsynchAEMessage.Payload, payloadType);
+ return this;
+ }
+
+ public DirectMessage withReplyQueue(BlockingQueue<DirectMessage> replyQueue ) {
+ store(AsynchAEMessage.ReplyToEndpoint, replyQueue);
+ return this;
+ }
+ public DirectMessage withMetadata(ProcessingResourceMetaData aProcessingResourceMetadata) {
+ store(AsynchAEMessage.AEMetadata, aProcessingResourceMetadata);
+ return this;
+ }
+ public DirectMessage withSerializationType(int serializationType) {
+ store(AsynchAEMessage.SERIALIZATION, serializationType);
+ return this;
+ }
+ public DirectMessage withReplyDestination(Object replyDestination) {
+ store(AsynchAEMessage.ReplyToEndpoint, replyDestination);
+ return this;
+ }
+ public DirectMessage withDelegateKey(Object delegateKey) {
+ store(AsynchAEMessage.DelegateKey, delegateKey);
+ return this;
+ }
+ public String getAsString(String key) {
+ return (String)stateMap.get(key);
+ }
+ public int getAsInt(String key) {
+ return (Integer)stateMap.get(key);
+ }
+ public BlockingQueue<DirectMessage> getFreeQueue(String key) {
+ return (BlockingQueue<DirectMessage>)stateMap.get(key);
+ }
+ public Object get(String key) {
+ return stateMap.get(key);
+ }
+ public boolean propertyExists(String key) {
+ return stateMap.containsKey(key);
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.cas.SerialFormat;
+
+public class DirectMessageContext implements MessageContext {
+ private Endpoint endpoint;
+
+ private long messageArrivalTime = 0L;
+
+ private String endpointName;
+
+ DirectMessage message;
+
+ public DirectMessageContext() {
+ }
+
+ public DirectMessageContext(DirectMessage message, String anEndpointName, String controllerName ) {
+ this();
+ this.message = message;
+ endpoint = new Endpoint_impl();
+ endpointName = anEndpointName;
+ endpoint.setSerialFormat(SerialFormat.UNKNOWN);
+ endpoint.setServerURI("java");
+ endpoint.setEndpoint(anEndpointName);
+ endpoint.setReplyDestination(message.get(AsynchAEMessage.ReplyToEndpoint));
+ endpoint.setDelegateKey(message.getAsString(AsynchAEMessage.DelegateKey));
+ StringBuilder sb = new StringBuilder();
+ if ( controllerName != null && !controllerName.trim().isEmpty()) {
+ sb.append("Service:"+controllerName+" ");
+ sb.append("Delegate Key:"+endpoint.getDelegateKey()+"\t");
+
+ }
+ sb.append(this.getClass().getSimpleName()).append("\n").append(" - message from:").
+ append(message.get(AsynchAEMessage.MessageFrom)).append("\n").
+ append(" - ServerURI:").append("java").append("\n");
+ if ( message.propertyExists(AsynchAEMessage.CasReference) ) {
+ sb.append(" - CasReferenceId:").
+ append(message.getAsString(AsynchAEMessage.CasReference))
+ .append("\n");
+ }
+
+ System.out.println(sb.toString());
+ }
+
+ public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException {
+ return (String)message.get(aMessagePropertyName);
+ }
+
+ public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException {
+ return (Integer)message.get(aMessagePropertyName);
+ }
+
+ public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException {
+ return (Long)message.get(aMessagePropertyName);
+ }
+
+ public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException {
+ return message.get(aMessagePropertyName);
+ }
+
+ public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException {
+ return (Boolean)message.get(aMessagePropertyName);
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ private Object getMessageCargo() {
+ return message.get("Cargo");
+ }
+ public String getStringMessage() throws AsynchAEException {
+ return (String)getMessageCargo();
+ }
+
+ public Object getObjectMessage() throws AsynchAEException {
+ return getMessageCargo();
+ }
+
+ public byte[] getByteMessage() throws AsynchAEException {
+ return null; // this class returns XMI only
+ }
+
+ public Object getRawMessage() {
+ return message;
+ }
+
+ public boolean propertyExists(String aKey) throws AsynchAEException {
+ return message.propertyExists(aKey);
+ }
+
+ public void setMessageArrivalTime(long anArrivalTime) {
+ messageArrivalTime = anArrivalTime;
+ }
+
+ public long getMessageArrivalTime() {
+ return messageArrivalTime;
+ }
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectOutputChannel.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.uima.aae.OutputChannel;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.error.UimaEEServiceException;
+import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+
+public class DirectOutputChannel implements OutputChannel {
+ private AnalysisEngineController controller;
+ private volatile boolean aborting = false;
+ private BlockingQueue<DirectMessage> freeCASQueue;
+
+ public DirectOutputChannel withController(AnalysisEngineController controller) {
+ this.controller = controller;
+ return this;
+ }
+
+ public void setFreeCASQueue(BlockingQueue<DirectMessage> queue) {
+ freeCASQueue = queue;
+ }
+
+ public ENDPOINT_TYPE getType() {
+ return ENDPOINT_TYPE.DIRECT;
+ }
+
+ @Override
+ public void stop(boolean shutdownNow) throws Exception {
+ aborting = true;
+ }
+
+ @Override
+ public void stop(int channelsToStop, boolean shutdownNow) throws Exception {
+
+ }
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public void setController(AnalysisEngineController aController) {
+ withController(aController);
+ }
+
+ @Override
+ public void initialize() throws AsynchAEException {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
+
+ BlockingQueue<DirectMessage> requestQueue = null;
+
+ String origin = controller.getServiceEndpointName();
+ if ( !controller.isTopLevelComponent()) {
+ origin = controller.getKey();
+ }
+ if ( origin == null ) {
+ origin = controller.getComponentName();
+ }
+ DirectMessage dm = new DirectMessage()
+ .withMessageType(AsynchAEMessage.Request)
+ .withCommand(aCommand)
+ .withDelegateKey(anEndpoint.getDelegateKey())
+ .withOrigin(origin);
+/*
+ dm.put(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ dm.put(AsynchAEMessage.Command, aCommand);
+ dm.put(AsynchAEMessage.MessageFrom, controller.getServiceEndpointName());
+ */
+ // InputChannel ic = controller.getInputChannel();
+ // List<Listener> listeners = ic.getListeners();
+
+ switch (aCommand) {
+ case AsynchAEMessage.CollectionProcessComplete:
+ requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+ dm.withReplyDestination(anEndpoint.getReplyDestination());
+ break;
+ case AsynchAEMessage.ReleaseCAS:
+ System.out.println(">>>>>>>>>>>>>>>>>>> DirectOutputChannel.sendRequest() - Sending ReleaseCAS Request");
+ dm.withCasReferenceId( aCasReferenceId);
+ requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+ break;
+ case AsynchAEMessage.GetMeta:
+ // every delegate has an endpoint containing a replyTo queue where
+ // a listener waiting for reply msgs.
+ dm.withReplyDestination( anEndpoint.getReplyDestination());
+
+ requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getMetaDestination();
+ System.out.println(">>>>>>>>>>>>>>>>>>> Service:" + controller.getComponentName()
+ + " DirectOutputChannel.sendRequest() - Sending GetMeta Request to delegate:"+anEndpoint.getDelegateKey()+" target queue.hashcode():"+requestQueue.hashCode());
+ // delegate = startGetMetaTimerAndGetDelegate(anEndpoint);
+// dm.withDelegateKey(anEndpoint.getDelegateKey());
+ break;
+ case AsynchAEMessage.Stop:
+ requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+ dm.withCasReferenceId(aCasReferenceId);
+ break;
+
+ case AsynchAEMessage.Process:
+ // Delegate must reply to a given endpoint
+ dm.withReplyDestination(anEndpoint.getReplyDestination());
+ // passing CAS by reference
+ dm.withPayload(AsynchAEMessage.CASRefID);
+ // id of a CAS which is needed to find it in a global cache
+ dm.withCasReferenceId(aCasReferenceId);
+ dm.withEndpointName(anEndpoint.getEndpoint());
+ requestQueue = (BlockingQueue<DirectMessage>) anEndpoint.getDestination();
+ break;
+
+ default:
+ dm.withPayload(AsynchAEMessage.None);
+
+ };
+
+ requestQueue.add(dm);
+ }
+
+ @Override
+ public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId, boolean notifyOnJmsException)
+ throws AsynchAEException {
+ System.out.println(".... Controller:" + controller.getComponentName()
+ + " DirectOutputChannel.sendReply() - CAS Id:" + aCasReferenceId+" Origin:"+controller.getServiceEndpointName());
+ @SuppressWarnings("unchecked")
+ BlockingQueue<DirectMessage> replyQueue =
+ (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+ DirectMessage replyMessage = new DirectMessage().withCommand(aCommand).withMessageType(AsynchAEMessage.Response)
+ .withOrigin(getOrigin()).withPayload(AsynchAEMessage.None);
+
+ replyQueue.add(replyMessage);
+
+ }
+
+
+// public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
+ public void sendReply(CasStateEntry casStateEntry, Endpoint anEndpoint) throws AsynchAEException {
+ @SuppressWarnings("unchecked")
+ BlockingQueue<DirectMessage> replyQueue = (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+
+ DirectMessage message;
+
+
+// casStateEntry = controller.getLocalCache().lookupEntry(entry.getCasReferenceId());
+
+ ServicePerformance casStats =
+ controller.getCasStatistics(casStateEntry.getCasReferenceId());
+
+ if (controller.isCasMultiplier() && casStateEntry.isSubordinate()) {
+ String casAncestor = casStateEntry.getParentCasReferenceId();
+ if ( controller.isTopLevelComponent() ) {
+ casAncestor = casStateEntry.getInputCasReferenceId();
+ }
+ // This CM will send a child CAS to the Client for processing. The
+ message = new DirectMessage().withCommand(AsynchAEMessage.Process)
+ .withMessageType(AsynchAEMessage.Request).withOrigin(getOrigin())
+ .withPayload(AsynchAEMessage.CASRefID).withCasReferenceId(casStateEntry.getCasReferenceId())
+ .withParentCasReferenceId(casAncestor).withSequenceNumber(casStateEntry.getSequenceNumber())
+ .withFreeCASQueue(freeCASQueue);
+ System.out.println("..... Service:" + controller.getComponentName()
+ + " Sending Child CAS and FreeCAS Queue:" + freeCASQueue.hashCode()+" For CAS:"+casStateEntry.getCasReferenceId());
+
+ } else {
+ message = new DirectMessage().withCommand(AsynchAEMessage.Process)
+ .withMessageType(AsynchAEMessage.Response).withOrigin(getOrigin())//controller.getServiceEndpointName())
+ .withPayload(AsynchAEMessage.CASRefID)
+ .withCasReferenceId(casStateEntry.getCasReferenceId());
+ }
+ // add timing stats associated with the CAS processing
+ message.withStat(AsynchAEMessage.TimeToSerializeCAS, casStats.getRawCasSerializationTime())
+ .withStat(AsynchAEMessage.TimeToDeserializeCAS, casStats.getRawCasDeserializationTime())
+ .withStat(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime())
+ .withStat(AsynchAEMessage.IdleTime,
+ controller.getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process));
+
+ replyQueue.add(message);
+
+ }
+
+ private String getTopParentCasReferenceId(String casReferenceId) {
+ if (!controller.getLocalCache().containsKey(casReferenceId)) {
+ return null;
+ }
+ CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
+
+ if (casStateEntry.isSubordinate()) {
+ // Recurse until the top CAS reference Id is found
+ return getTopParentCasReferenceId(casStateEntry.getParentCasReferenceId());
+ }
+ // Return the top ancestor CAS id
+ return casStateEntry.getCasReferenceId();
+ }
+
+ @Override
+ public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata, Endpoint anEndpoint,
+ boolean serialize) throws AsynchAEException {
+ System.out.println("Service:" + controller.getName()
+ + " DirectOutputChannel.sendReply() - sending GetMeta Reply - MsgFrom:"+anEndpoint.getDelegateKey());//controller.getServiceEndpointName()+" Delegate Key:"+anEndpoint.getDelegateKey());//controller.getKey());
+ String msgFrom = controller.getName(); //controller.getServiceEndpointName();
+// if ( !controller.isTopLevelComponent()) {
+// msgFrom = controller.getKey();
+// }
+// if ( msgFrom == null ) {
+// msgFrom = anEndpoint.getDelegateKey();
+// }
+ DirectMessage getMetaReplyMessage = new DirectMessage().withCommand(AsynchAEMessage.GetMeta)
+ .withMessageType(AsynchAEMessage.Response).withOrigin(msgFrom)
+ .withPayload(AsynchAEMessage.Metadata);
+
+ // BlockingQueue<DirectMessage> replyQueue =
+ // (BlockingQueue<DirectMessage>)anEndpoint.getDestination();
+ @SuppressWarnings("unchecked")
+ BlockingQueue<DirectMessage> replyQueue = (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+
+ // DirectMessage dm = new DirectMessage();
+ // dm.put(AsynchAEMessage.MessageType,AsynchAEMessage.Response);
+ // dm.put(AsynchAEMessage.Command,AsynchAEMessage.GetMeta);
+ // dm.put(AsynchAEMessage.Payload, AsynchAEMessage.Metadata);
+ // dm.put(AsynchAEMessage.MessageFrom, controller.getServiceId());
+ getMetaReplyMessage.withSerializationType(AsynchAEMessage.None);
+ getMetaReplyMessage.withMetadata(aProcessingResourceMetadata);
+ replyQueue.add(getMetaReplyMessage);
+ }
+
+ private UimaEEServiceException wrapErrorInUimaEEServiceException(Throwable t) {
+ if (!(t instanceof UimaEEServiceException)) {
+ UimaEEServiceException wrapper;
+ // Strip off AsyncAEException and replace with UimaEEServiceException
+ if (t instanceof AsynchAEException && t.getCause() != null) {
+ wrapper = new UimaEEServiceException(t.getCause());
+ } else {
+ wrapper = new UimaEEServiceException(t);
+ }
+ return wrapper;
+ } else {
+ return (UimaEEServiceException) t;
+ }
+ }
+
+ private String getOrigin() {
+ String origin = controller.getServiceEndpointName();
+
+ if ( !controller.isTopLevelComponent()) {
+ origin = controller.getKey();
+ }
+ return origin;
+ }
+ public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId, Endpoint anEndpoint,
+ int aCommand) throws AsynchAEException {
+ anEndpoint.setReplyEndpoint(true);
+ // the client expects an error wrapped in UimaEEServiceException
+ UimaEEServiceException errorWrapper = wrapErrorInUimaEEServiceException(t);
+
+ // construct reply msg containing the error
+ DirectMessage errorReplyMessage = new DirectMessage().withCommand(AsynchAEMessage.Process)
+ .withMessageType(AsynchAEMessage.Response).withOrigin(getOrigin())
+ .withCasReferenceId(aCasReferenceId).withParentCasReferenceId(aParentCasReferenceId)
+ .withError(errorWrapper).withPayload(AsynchAEMessage.Exception);
+ @SuppressWarnings("unchecked")
+ BlockingQueue<DirectMessage> replyQueue = (BlockingQueue<DirectMessage>) anEndpoint.getReplyDestination();
+ controller.dropStats(aCasReferenceId, getName());
+
+ replyQueue.add(errorReplyMessage);
+
+ }
+
+ @Override
+ public void bindWithClientEndpoint(Endpoint anEndpoint) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setServerURI(String aServerURI) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stop() {
+ // TODO Auto-generated method stub
+ aborting = true;
+ }
+
+ @Override
+ public void cancelTimers() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean isStopping() {
+ return aborting;
+ }
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/JavaQueueListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+public interface JavaQueueListener {
+ public void setConsumers(int howMany);
+
+}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java
URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java?rev=1825401&view=auto
==============================================================================
--- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java (added)
+++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/LifecycleListener.java Mon Feb 26 18:54:11 2018
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.as.client;
+
+public interface LifecycleListener {
+ public void start();
+ public void stop();
+
+}