You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/16 19:03:15 UTC
svn commit: r1071333 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Author: cwiklik
Date: Wed Feb 16 18:03:15 2011
New Revision: 1071333
URL: http://svn.apache.org/viewvc?rev=1071333&view=rev
Log:
UIMA-2055 Combined 4 versions of sendRequest() into one. Did the same with sendCasToRemoteEndpoint()
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1071333&r1=1071332&r2=1071333&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Feb 16 18:03:15 2011
@@ -19,8 +19,6 @@
package org.apache.uima.adapter.jms.activemq;
-import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.NotSerializableException;
import java.io.PrintWriter;
@@ -30,20 +28,16 @@ import java.net.InetAddress;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
@@ -54,17 +48,17 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.Channel;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.SerializerCache;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
-import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
@@ -546,65 +540,37 @@ public class JmsOutputChannel implements
return endpointConnection;
}
- /**
- * Sends request message to a delegate.
- *
- * @param aCommand
- * - the type of request [Process|GetMeta]
- * @param anEndpoint
- * - the destination where the delegate receives messages
- *
- * @throws AsynchAEException
- */
- public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint)
- throws AsynchAEException {
- try {
-
- if ( aCommand == AsynchAEMessage.ReleaseCAS ) {
- anEndpoint.setReplyEndpoint(true);
- anEndpoint.setIsCasMultiplier(true);
- anEndpoint.setFreeCasEndpoint(true);
- }
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
-
- TextMessage tm = endpointConnection.produceTextMessage("");
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
- if (aCommand == AsynchAEMessage.ReleaseCAS || aCommand == AsynchAEMessage.Stop) {
+ private void logRequest(String key, Endpoint anEndpoint ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "key", new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getDelegateKey() });
+ }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_release_cas_req__FINE",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint(),
- aCasReferenceId });
+ }
+ private Delegate startGetMetaTimerAndGetDelegate( Endpoint anEndpoint ) {
+ Delegate delegate = null;
+ if (anEndpoint.getDestination() != null) {
+ String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
+ .getPhysicalName().replaceAll(":", "_");
+ if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
+ String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+ .lookUpDelegateKey(anEndpoint.getEndpoint());
+ ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
+ .getDelegateServiceInfo(delegateKey);
+ if (serviceInfo != null) {
+ serviceInfo.setReplyQueueName(replyQueueName);
+ serviceInfo.setServiceKey(delegateKey);
+ }
+ delegate = lookupDelegate(delegateKey);
+ if (delegate.getGetMetaTimeout() > 0) {
+ System.out.println("--------------- Service:"+getAnalysisEngineController().getComponentName()+" Starting getMeta Timer For Delegate:"+anEndpoint.getDelegateKey());
+ delegate.startGetMetaRequestTimer();
}
}
- // Only used to send a Stop or ReleaseCas request so probably no need to start a connection
- // timer ?
- endpointConnection.send(tm, 0, true);
- } catch (JMSException e) {
- // Unable to establish connection to the endpoint. Logit and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- } catch (ServiceShutdownException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_shutdown__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
+ }
+ return delegate;
}
-
/**
* Sends request message to a delegate.
*
@@ -615,89 +581,54 @@ public class JmsOutputChannel implements
*
* @throws AsynchAEException
*/
- public void sendRequest(int aCommand, Endpoint anEndpoint) {
+ public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint)
+ throws AsynchAEException {
Delegate delegate = null;
try {
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
- TextMessage tm = endpointConnection.produceTextMessage("");
+ Message tm = endpointConnection.produceTextMessage("");
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- tm.setText(""); // Need this to prevent the Broker from throwing an exception when sending a
- // message to C++ service
+ switch(aCommand) {
+ case AsynchAEMessage.CollectionProcessComplete:
+ logRequest("UIMAEE_send_cpc_req__FINE", anEndpoint);
+ break;
+ case AsynchAEMessage.ReleaseCAS:
+ tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ logRequest("UIMAJMS_releasecas_request__endpoint__FINEST", anEndpoint);
+ break;
+ case AsynchAEMessage.GetMeta:
+ delegate = startGetMetaTimerAndGetDelegate(anEndpoint);
+ logRequest("UIMAEE_service_sending_getmeta_request__FINE", anEndpoint);
+ break;
+ case AsynchAEMessage.Stop:
+ tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ logRequest("UIMAEE_service_sending_stop_request__FINE", anEndpoint);
+ break;
+
+ case AsynchAEMessage.Process:
+ logRequest("UIMAEE_service_sending_process_request__FINE", anEndpoint);
+ serializeCasAndSend(getAnalysisEngineController().
+ getInProcessCache().
+ getCacheEntryForCAS(aCasReferenceId), anEndpoint);
+ return; /// <<<<< RETURN - Done here >>>>
+
+
+ };
populateHeaderWithRequestContext(tm, anEndpoint, aCommand);
// For remotes add a special property to the message. This property
// will be echoed back by the service. This property enables matching
// the reply with the right endpoint object managed by the aggregate.
- if (anEndpoint.isRemote()) {
- tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
- }
- boolean startTimer = false;
- // Start timer for endpoints that are remote and are managed by a different broker
- // than this service. If an endpoint contains a destination object, the outgoing
- // request will contain a JMSReplyTo object which will point to a temp queue
- if (anEndpoint.isRemote() && anEndpoint.getDestination() == null) {
- startTimer = true;
- }
- if (aCommand == AsynchAEMessage.CollectionProcessComplete) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_send_cpc_req__FINE", new Object[] { anEndpoint.getEndpoint() });
- }
- } else if (aCommand == AsynchAEMessage.ReleaseCAS) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINEST,
- CLASS_NAME.getName(),
- "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_releasecas_request__endpoint__FINEST",
- new Object[] { getAnalysisEngineController().getName(),
- endpointConnection.getEndpoint() });
- }
- } else if (aCommand == AsynchAEMessage.GetMeta) {
- if (anEndpoint.getDestination() != null) {
- String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
- .getPhysicalName().replaceAll(":", "_");
- if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
- String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
- .lookUpDelegateKey(anEndpoint.getEndpoint());
- ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
- .getDelegateServiceInfo(delegateKey);
- if (serviceInfo != null) {
- serviceInfo.setReplyQueueName(replyQueueName);
- serviceInfo.setServiceKey(delegateKey);
- }
- delegate = lookupDelegate(delegateKey);
- if (delegate.getGetMetaTimeout() > 0) {
- delegate.startGetMetaRequestTimer();
- }
- }
- } else if (!anEndpoint.isRemote()) {
- ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
- .getServiceInfo();
- if (serviceInfo != null) {
- serviceInfo.setReplyQueueName(controllerInputEndpoint);
- }
- }
- } else {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINEST,
- CLASS_NAME.getName(),
- "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_metadata_request__endpoint__FINEST",
- new Object[] { endpointConnection.getEndpoint(),
- endpointConnection.getServerUri() });
- }
- }
- if (endpointConnection.send(tm, 0, startTimer) != true) {
+ tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());
+
+ if (endpointConnection.send(tm, 0, true) != true) {
throw new ServiceNotFoundException();
}
+ } catch (AsynchAEException e) {
+ throw e;
} catch (Exception e) {
if (delegate != null && aCommand == AsynchAEMessage.GetMeta) {
delegate.cancelDelegateTimer();
@@ -710,165 +641,43 @@ public class JmsOutputChannel implements
getAnalysisEngineController());
}
}
-
- public void sendRequest(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
- new Object[] { anEndpoint.getEndpoint() });
- }
- try {
- if (anEndpoint.isRemote()) {
- if (anEndpoint.getSerializer().equals("xmi")) {
- String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
- anEndpoint.isRetryEnabled());
- if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINEST,
- CLASS_NAME.getName(),
- "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_sending_serialized_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
- }
- // Send process request to remote delegate and start timeout timer
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
- } else {
- byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId, anEndpoint,
- anEndpoint.isRetryEnabled());
- if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINEST,
- CLASS_NAME.getName(),
- "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_sending_binary_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getEndpoint(), aCasReferenceId, serializedCAS });
- }
-
- // Send process request to remote delegate and start timeout timer
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
-
- }
-
- } else {
- // Not supported
+ private void serializeCasAndSend(CacheEntry entry, Endpoint anEndpoint) throws Exception {
+ if (anEndpoint.getSerializer().equals("xmi")) {
+ String serializedCAS = getSerializedCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
+ anEndpoint.isRetryEnabled());
+ if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_serialized_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getDelegateKey(), entry.getCasReferenceId(), serializedCAS });
}
- } catch (ServiceShutdownException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
+ // Send process request to remote delegate and start timeout timer
+ sendCasToRemoteEndpoint(true, serializedCAS, entry, anEndpoint, true);
+ } else {
+ byte[] serializedCAS = getBinaryCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
+ anEndpoint.isRetryEnabled());
+ if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "sendRequest",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_sending_binary_cas__FINEST",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ anEndpoint.getDelegateKey(), entry.getCasReferenceId(), serializedCAS });
}
- }
+
+ // Send process request to remote delegate and start timeout timer
+ sendCasToRemoteEndpoint(true, serializedCAS, entry, anEndpoint, true);
- catch (AsynchAEException e) {
- throw e;
- } catch (Exception e) {
- throw new AsynchAEException(e);
}
- }
-
- /**
- * Sends request message to process CAS to the given destinations. This method enables processing
- * of the same CAS in multiple Analysis Engines at the same time.
- *
- * @param aCasReferenceId
- * @param endpoints
- * @throws AsynchAEException
- */
- public void sendRequest(String aCasReferenceId, Endpoint[] endpoints) throws AsynchAEException {
- Endpoint currentEndpoint = null;
- try {
- boolean cacheSerializedCas = endpointRetryEnabled(endpoints);
- // The default serialization strategy for parallel step is xmi.
- // Binary serialization doesnt support merge.
- endpoints[0].setSerializer("xmi");
-
- // Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel
- // Flow
- // must use the same format (either XCAS or XMI)
- String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId, endpoints[0],
- cacheSerializedCas);
- // Using provided endpoints, create JMS message for each destination and sent the serialized
- // CAS to it.
- for (int i = 0; i < endpoints.length; i++) {
- // For remote delegates, optionally cache serialized CAS in case a retry on timeout is
- // required.
- if (endpoints[i].isRemote()) {
-
- currentEndpoint = endpoints[i];
- if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINEST,
- CLASS_NAME.getName(),
- "sendRequest",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_sending_serialized_cas__FINEST",
- new Object[] { getAnalysisEngineController().getComponentName(),
- endpoints[i].getEndpoint(), aCasReferenceId, serializedCAS });
- }
-
- // The default serialization strategy for parallel step is xmi.
- // Binary serialization doesnt support merge.
- endpoints[i].setSerializer("xmi");
- sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0);
- } else {
- // Currently this use case is not supported. Parallel processing of CAS is only supported
- // with remote Delegates
- }
- }
- } catch (Exception e) {
- // Handle the error
- ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
- errorContext.add(AsynchAEMessage.Endpoint, currentEndpoint);
- errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
- getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
- getAnalysisEngineController());
- }
}
- public void sendReply(CAS aCas, String anInputCasReferenceId, String aNewCasReferenceId,
- Endpoint anEndpoint, long sequence) throws AsynchAEException {
- try {
- anEndpoint.setReplyEndpoint(true);
- if (anEndpoint.isRemote()) {
- // Serializes CAS and releases it back to CAS Pool
- String serializedCAS = getSerializedCas(true, aNewCasReferenceId, anEndpoint, anEndpoint
- .isRetryEnabled());
- sendCasToRemoteEndpoint(false, serializedCAS, anInputCasReferenceId, aNewCasReferenceId,
- anEndpoint, false, sequence);
- } else {
- // Not supported
- }
- } catch (ServiceShutdownException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
- } catch (AsynchAEException e) {
- throw e;
- }
-
- catch (Exception e) {
- throw new AsynchAEException(e);
- }
-
- }
public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
try {
@@ -985,232 +794,100 @@ public class JmsOutputChannel implements
}
}
- public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId)
- throws AsynchAEException {
- sendReply(aCommand, anEndpoint, aCasReferenceId, false);
- }
- public void sendReply(int aCommand, Endpoint anEndpoint) throws AsynchAEException {
+ /**
+ * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
+ * full stack)
+ *
+ * @param t
+ * - Throwable to include in the reply message
+ * @param anEndpoint
+ * - an endpoint to receive the reply message
+ * @param aCasReferenceId
+ * - a unique CAS reference id
+ *
+ * @throws AsynchAEException
+ */
+ public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
+ Endpoint anEndpoint, int aCommand) throws AsynchAEException {
anEndpoint.setReplyEndpoint(true);
try {
+ Throwable wrapper = null;
+ if (!(t instanceof UimaEEServiceException)) {
+ // Strip off AsyncAEException and replace with UimaEEServiceException
+ if (t instanceof AsynchAEException && t.getCause() != null) {
+ wrapper = new UimaEEServiceException(t.getCause());
+ } else {
+ wrapper = new UimaEEServiceException(t);
+ }
+ }
if (aborting) {
return;
}
+ anEndpoint.setReplyEndpoint(true);
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ // Create Message that will contain serialized Exception with stack
+ ObjectMessage om = endpointConnection.produceObjectMessage();
+ // Now try to catch non-serializable exception. The Throwable passed into this method may
+ // not be serializable. Catch the exception, and create a wrapper containing stringified
+ // stack trace.
+ try {
+ // serialize the Throwable
+ if (wrapper == null) {
+ om.setObject(t);
+ } else {
+ om.setObject(wrapper);
+ }
+ } catch( RuntimeException e) {
+ // Check if we failed due to non-serializable object in the Throwable
+ if ( e.getCause() != null && e.getCause() instanceof NotSerializableException ) {
+ // stringify the stack trace
+ StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ wrapper = new UimaEEServiceException(sw.toString());
+ // serialize the new wrapper
+ om.setObject(wrapper);
+ } else {
+ throw e; // rethrow
+ }
+ } catch( Exception e) {
+ throw e; // rethrow
+ }
+ // Add common header properties
+ populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);
- TextMessage tm = endpointConnection.produceTextMessage("");
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
- populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
+ om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
+ if (aCasReferenceId != null) {
+ om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ if (aParentCasReferenceId != null) {
+ om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
+ }
+ }
- endpointConnection.send(tm, 0, false);
- addIdleTime(tm);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "sendReply",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_cpc_reply_sent__FINE",
- new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getEndpoint() });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
+ // Dispatch Message to destination
+ endpointConnection.send(om, 0, false);
+ addIdleTime(om);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
+ new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
+ }
+ } catch (ServiceShutdownException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- e);
- }
- }
-
- catch (ServiceShutdownException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
- }
-
- catch (AsynchAEException e) {
- throw e;
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
- }
-
- /**
- * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
- * full stack)
- *
- * @param aCasReferenceId
- * - a unique CAS reference id
- * @param anEndpoint
- * - an endpoint to receive the reply message
- *
- * @throws AsynchAEException
- */
- public void sendReply(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
- anEndpoint.setReplyEndpoint(true);
- try {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_replyto_endpoint__FINE",
- new Object[] { anEndpoint.getEndpoint(), aCasReferenceId });
- }
- if (anEndpoint.isRemote()) {
- CacheEntry entry = null;
- try {
- entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
- aCasReferenceId);
-
- } catch (Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "sendReply",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_cas_not_found__INFO",
- new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getEndpoint(), aCasReferenceId });
- }
- return;
- }
- if (anEndpoint.getSerializer().equals("xmi")) {
- // Serializes CAS and releases it back to CAS Pool
- String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false);
- sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0);
- } else {
- byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
- .isRetryEnabled());
- if (binaryCas == null) {
- return;
- }
- sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
- }
- } else {
- // Not supported
- }
- } catch (ServiceShutdownException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
- }
-
- catch (AsynchAEException e) {
- throw e;
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
- }
-
- /**
- * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
- * full stack)
- *
- * @param t
- * - Throwable to include in the reply message
- * @param anEndpoint
- * - an endpoint to receive the reply message
- * @param aCasReferenceId
- * - a unique CAS reference id
- *
- * @throws AsynchAEException
- */
- public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
- Endpoint anEndpoint, int aCommand) throws AsynchAEException {
- anEndpoint.setReplyEndpoint(true);
- try {
- Throwable wrapper = null;
- if (!(t instanceof UimaEEServiceException)) {
- // Strip off AsyncAEException and replace with UimaEEServiceException
- if (t instanceof AsynchAEException && t.getCause() != null) {
- wrapper = new UimaEEServiceException(t.getCause());
- } else {
- wrapper = new UimaEEServiceException(t);
- }
- }
- if (aborting) {
- return;
- }
- anEndpoint.setReplyEndpoint(true);
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
- // Create Message that will contain serialized Exception with stack
- ObjectMessage om = endpointConnection.produceObjectMessage();
- // Now try to catch non-serializable exception. The Throwable passed into this method may
- // not be serializable. Catch the exception, and create a wrapper containing stringified
- // stack trace.
- try {
- // serialize the Throwable
- if (wrapper == null) {
- om.setObject(t);
- } else {
- om.setObject(wrapper);
- }
- } catch( RuntimeException e) {
- // Check if we failed due to non-serializable object in the Throwable
- if ( e.getCause() != null && e.getCause() instanceof NotSerializableException ) {
- // stringify the stack trace
- StringWriter sw = new StringWriter();
- t.printStackTrace(new PrintWriter(sw));
- wrapper = new UimaEEServiceException(sw.toString());
- // serialize the new wrapper
- om.setObject(wrapper);
- } else {
- throw e; // rethrow
- }
- } catch( Exception e) {
- throw e; // rethrow
- }
- // Add common header properties
- populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);
-
- om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
- if (aCasReferenceId != null) {
- om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- if (aParentCasReferenceId != null) {
- om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
- }
- }
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- // Dispatch Message to destination
- endpointConnection.send(om, 0, false);
- addIdleTime(om);
- } catch (JMSException e) {
- // Unable to establish connection to the endpoint. Logit and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- } catch (ServiceShutdownException e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
}
} catch (AsynchAEException e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -1470,15 +1147,6 @@ public class JmsOutputChannel implements
}
}
- private boolean endpointRetryEnabled(Endpoint[] endpoints) {
- for (int i = 0; i < endpoints.length; i++) {
- if (endpoints[i].isRetryEnabled()) {
- return true;
- }
- }
- return false;
- }
-
private void populateStats(Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId,
int anAdminCommand, boolean isRequest) throws Exception {
if (anEndpoint.isFinal()) {
@@ -1750,9 +1418,10 @@ public class JmsOutputChannel implements
}
}
- private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS,
- String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
- boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
+ private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
+ Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
+ ServiceShutdownException {
+ CasStateEntry casStateEntry = null;
long msgSize = 0;
try {
if (aborting) {
@@ -1762,33 +1431,9 @@ public class JmsOutputChannel implements
// Otherwise this is a request so use a broker specified in the endpoint object.
String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
- CacheEntry entry = this.getCacheEntry(aCasReferenceId);
- if (entry == null) {
- throw new AsynchAEException("Controller:"
- + getAnalysisEngineController().getComponentName()
- + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
- + " CAS:" + aCasReferenceId + " Not In The Cache");
- }
-
- // Get the connection object for a given endpoint
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
- if (endpointConnection == null) {
- throw new AsynchAEException("Controller:"
- + getAnalysisEngineController().getComponentName()
- + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
- + " Connection is Invalid. InputCasReferenceId:" + anInputCasReferenceId
- + " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
- }
- if (!endpointConnection.isOpen()) {
- if (!isRequest) {
- return;
- }
- }
- TextMessage tm = null;
- try {
- // Create empty JMS Text Message
- tm = endpointConnection.produceTextMessage("");
- } catch (AsynchAEException ex) {
+ casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
+ entry.getCasReferenceId());
+ if (casStateEntry == null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.WARNING,
CLASS_NAME.getName(),
@@ -1796,346 +1441,36 @@ public class JmsOutputChannel implements
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_send_reply__WARNING",
new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getDestination(), brokerConnectionURL, anInputCasReferenceId, aCasReferenceId, sequence, ex });
- return;
- }
- // Save Serialized CAS in case we need to re-send it for analysis
- if (anEndpoint.isRetryEnabled()
- && getAnalysisEngineController().getInProcessCache()
- .getSerializedCAS(aCasReferenceId) == null) {
- getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId,
- aSerializedCAS);
- }
- if (aSerializedCAS != null) {
- msgSize = aSerializedCAS.length();
- }
- tm.setText(aSerializedCAS);
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
- // Add Cas Reference Id to the outgoing JMS Header
- tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-
- // Add common properties to the JMS Header
- if (isRequest == true) {
- populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
- } else {
- populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
- tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
- }
- // The following is true when the analytic is a CAS Multiplier
- if (sequence > 0 && !isRequest) {
- // Override MessageType set in the populateHeaderWithContext above.
- // Make the reply message look like a request. This message will contain a new CAS
- // produced by the CAS Multiplier. The client will treat this CAS
- // differently from the input CAS.
- tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
- // Add a sequence number assigned to this CAS by the controller
- tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
- isRequest = true;
- // Add the name of the FreeCas Queue
- if (freeCASTempQueue != null) {
- // Attach a temp queue to the outgoing message. This a queue where
- // Free CAS notifications need to be sent from the client
- tm.setJMSReplyTo(freeCASTempQueue);
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- if (entry != null) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
- new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
- anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId,
- entry.getInputCasReferenceId() });
- }
- }
- }
- dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
- } catch (JMSException e) {
- // Unable to establish connection to the endpoint. Log it and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- }
-
- catch (ServiceShutdownException e) {
- throw e;
- } catch (AsynchAEException e) {
- throw e;
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
-
- }
-
- private void sendCasToRemoteEndpoint(boolean isRequest, byte[] aSerializedCAS,
- String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint,
- boolean startTimer, long sequence) throws AsynchAEException, ServiceShutdownException {
- long msgSize = 0;
- try {
- if (aborting) {
+ anEndpoint.getDestination(), brokerConnectionURL,
+ entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(),
+ entry.getCasReferenceId(), 0,
+ new Exception("Unable to lookup entry in Local Cache for a given Cas Id") });
return;
}
- // If this is a reply to a client, use the same broker URL that manages this service input queue.
- // Otherwise this is a request so use a broker specified in the endpoint object.
- String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
- if (aSerializedCAS != null) {
- msgSize = aSerializedCAS.length;
- }
- CacheEntry entry = this.getCacheEntry(aCasReferenceId);
- if (entry == null) {
- throw new AsynchAEException("Controller:"
- + getAnalysisEngineController().getComponentName()
- + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
- + " CAS:" + aCasReferenceId + " Not In The Cache");
- }
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
- if (endpointConnection == null) {
- throw new AsynchAEException("Controller:"
- + getAnalysisEngineController().getComponentName()
- + " Unable to Send Message To Remote Endpoint: " + anEndpoint.getEndpoint()
- + " Connection is Invalid. InputCasReferenceId:" + anInputCasReferenceId
- + " CasReferenceId:" + aCasReferenceId + " Sequece:" + sequence);
- }
- if (!endpointConnection.isOpen()) {
- if (!isRequest) {
- return;
- }
- }
-
- BytesMessage tm = null;
- try {
- // Create empty JMS Text Message
- tm = endpointConnection.produceByteMessage();
- } catch (AsynchAEException ex) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.WARNING,
- CLASS_NAME.getName(),
- "sendCasToRemoteDelegate",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_unable_to_send_reply__WARNING",
- new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getDestination(), brokerConnectionURL, anInputCasReferenceId, aCasReferenceId, sequence, ex });
- return;
- }
-
- tm.writeBytes(aSerializedCAS);
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
- // Add Cas Reference Id to the outgoing JMS Header
- tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- // Add common properties to the JMS Header
- if (isRequest == true) {
- populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
- } else {
- populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
- tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
- }
- // The following is true when the analytic is a CAS Multiplier
- if (sequence > 0 && !isRequest) {
- // Override MessageType set in the populateHeaderWithContext above.
- // Make the reply message look like a request. This message will contain a new CAS
- // produced by the CAS Multiplier. The client will treat this CAS
- // differently from the input CAS.
- tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
- tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId);
- // Add a sequence number assigned to this CAS by the controller
- tm.setLongProperty(AsynchAEMessage.CasSequence, sequence);
- isRequest = true;
- if (freeCASTempQueue != null) {
- // Attach a temp queue to the outgoing message. This a queue where
- // Free CAS notifications need to be sent from the client
- tm.setJMSReplyTo(freeCASTempQueue);
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- if (entry != null) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
- new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
- anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId,
- entry.getInputCasReferenceId() });
- }
- }
- }
- dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
- } catch (JMSException e) {
- // Unable to establish connection to the endpoint. Logit and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- }
-
- catch (ServiceShutdownException e) {
- throw e;
- } catch (AsynchAEException e) {
- throw e;
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
-
- }
-
- private void sendCasToRemoteEndpoint(boolean isRequest, String aSerializedCAS, CacheEntry entry,
- Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
- ServiceShutdownException {
- CasStateEntry casStateEntry = null;
- long msgSize = 0;
- try {
- if (aborting) {
- return;
- }
- // If this is a reply to a client, use the same broker URL that manages this service input queue.
- // Otherwise this is a request so use a broker specified in the endpoint object.
- String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
- casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
- entry.getCasReferenceId());
- // Get the connection object for a given endpoint
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (!endpointConnection.isOpen()) {
if (!isRequest) {
return;
}
}
- // Create empty JMS Text Message
- TextMessage tm = null;
+ Message tm = null;
try {
- // Create empty JMS Text Message
- tm = endpointConnection.produceTextMessage("");
- } catch (AsynchAEException ex) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.WARNING,
- CLASS_NAME.getName(),
- "sendCasToRemoteDelegate",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_unable_to_send_reply__WARNING",
- new Object[] { getAnalysisEngineController().getComponentName(),
- anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex });
- return;
- }
-
- // Save Serialized CAS in case we need to re-send it for analysis
- if (anEndpoint.isRetryEnabled()
- && getAnalysisEngineController().getInProcessCache().getSerializedCAS(
- entry.getCasReferenceId()) == null) {
- getAnalysisEngineController().getInProcessCache().saveSerializedCAS(
- entry.getCasReferenceId(), aSerializedCAS);
- }
- if (aSerializedCAS != null) {
- msgSize = aSerializedCAS.length();
- }
-
- tm.setText(aSerializedCAS);
- tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
- // Add Cas Reference Id to the outgoing JMS Header
- tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
- // Add common properties to the JMS Header
- if (isRequest == true) {
- populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
- } else {
- populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
- tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
- }
- // The following is true when the analytic is a CAS Multiplier
- if (casStateEntry.isSubordinate() && !isRequest) {
- // Override MessageType set in the populateHeaderWithContext above.
- // Make the reply message look like a request. This message will contain a new CAS
- // produced by the CAS Multiplier. The client will treat this CAS
- // differently from the input CAS.
- tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
-
- isRequest = true;
- // Save the id of the parent CAS
- tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
- .getCasReferenceId()));
- // Add a sequence number assigned to this CAS by the controller
- tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
- // If this is a Cas Multiplier, add a reference to a special queue where
- // the client sends Free Cas Notifications
- if (freeCASTempQueue != null) {
- // Attach a temp queue to the outgoing message. This is a queue where
- // Free CAS notifications need to be sent from the client
- tm.setJMSReplyTo(freeCASTempQueue);
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "sendCasToRemoteEndpoint",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
- new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
- anEndpoint.getEndpoint(), entry.getCasReferenceId(),
- entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
- }
- }
-
- dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
-
- } catch (JMSException e) {
- // Unable to establish connection to the endpoint. Logit and continue
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_unable_to_connect__INFO",
- new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
- }
- }
-
- catch (ServiceShutdownException e) {
- throw e;
- } catch (AsynchAEException e) {
- throw e;
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
-
- }
-
- private void sendCasToRemoteEndpoint(boolean isRequest, byte[] aSerializedCAS, CacheEntry entry,
- Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
- ServiceShutdownException {
- CasStateEntry casStateEntry = null;
- long msgSize = 0;
- try {
- if (aborting) {
- return;
- }
- // If this is a reply to a client, use the same broker URL that manages this service input queue.
- // Otherwise this is a request so use a broker specified in the endpoint object.
- String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
-
- casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
- entry.getCasReferenceId());
- // Get the connection object for a given endpoint
- JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
+ if ( anEndpoint.getSerializer().equals("xmi")) {
+ tm = endpointConnection.produceTextMessage((String)aSerializedCAS);
+ if (aSerializedCAS != null) {
+ msgSize = ((String)aSerializedCAS).length();
+ }
- if (aSerializedCAS != null) {
- msgSize = aSerializedCAS.length;
- }
- if (!endpointConnection.isOpen()) {
- if (!isRequest) {
- return;
+ } else {
+ // Create empty JMS Bytes Message
+ tm = endpointConnection.produceByteMessage((byte[])aSerializedCAS);
+ if (aSerializedCAS != null) {
+ msgSize = ((byte[])aSerializedCAS).length;
+ }
}
- }
- // Create empty JMS Text Message
- BytesMessage tm = null;
- try {
- // Create empty JMS Text Message
- tm = endpointConnection.produceByteMessage();
} catch (AsynchAEException ex) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.WARNING,
@@ -2147,7 +1482,6 @@ public class JmsOutputChannel implements
anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex });
return;
}
- tm.writeBytes(aSerializedCAS);
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
// Add Cas Reference Id to the outgoing JMS Header
tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
@@ -2158,9 +1492,6 @@ public class JmsOutputChannel implements
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
}
- if (casStateEntry == null) {
- return;
- }
// The following is true when the analytic is a CAS Multiplier
if (casStateEntry.isSubordinate() && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
@@ -2257,15 +1588,6 @@ public class JmsOutputChannel implements
getAnalysisEngineController().saveReplyTime(t, "");
}
- private CacheEntry getCacheEntry(String aCasReferenceId) throws Exception {
- CacheEntry cacheEntry = null;
- if (getAnalysisEngineController().getInProcessCache().entryExists(aCasReferenceId)) {
- cacheEntry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
- aCasReferenceId);
- }
- return cacheEntry;
- }
-
public void stop() {
stop(Channel.CloseAllChannels);
}
@@ -2343,16 +1665,6 @@ public class JmsOutputChannel implements
}
}
- private String extractURLWithProtocol(String aProtocolList, String aProtocol) {
- StringTokenizer tokenizer = new StringTokenizer(aProtocolList, ",");
- while (tokenizer.hasMoreTokens()) {
- String token = tokenizer.nextToken().trim();
- if (token.toLowerCase().startsWith(aProtocol.toLowerCase())) {
- return token;
- }
- }
- return null;
- }
public void cancelTimers() {
if (connectionMap.size() > 0) {