You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:16:24 UTC
svn commit: r810547 [2/5] -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Wed Sep 2 15:16:23 2009
@@ -55,772 +55,705 @@
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
- * Thin adapter for receiving JMS messages from Spring. It delegates processing of all
- * messages to the {@link MessageHandler}. Each JMS Message is wrapped in transport neutral
- * MessageContext wrapper.
- *
+ * Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to
+ * the {@link MessageHandler}. Each JMS Message is wrapped in transport neutral MessageContext
+ * wrapper.
+ *
*/
-public class JmsInputChannel
-implements InputChannel, JmsInputChannelMBean, SessionAwareMessageListener
-{
- /**
+public class JmsInputChannel implements InputChannel, JmsInputChannelMBean,
+ SessionAwareMessageListener {
+ /**
*
*/
- private static final long serialVersionUID = -3318400773113552290L;
+ private static final long serialVersionUID = -3318400773113552290L;
+
+ private static final Class CLASS_NAME = JmsInputChannel.class;
+
+ private transient final CountDownLatch msgHandlerLatch = new CountDownLatch(1);
+
+ private transient final CountDownLatch controllerLatch = new CountDownLatch(1);
+
+ // Reference to the first Message Handler in the Chain.
+ private transient Handler handler;
+
+ // The name of the queue this Listener is expecting to receive messages from
+ private String endpointName;
+
+ // Reference to the Controller Object
+ private transient AnalysisEngineController controller;
+
+ private int sessionAckMode;
+
+ private transient UimaDefaultMessageListenerContainer messageListener;
+
+ private transient Session jmsSession;
+
+ private String brokerURL = "";
+
+ private ServiceInfo serviceInfo = null;
+
+ private volatile boolean stopped = false;
+
+ private volatile boolean channelRegistered = false;
+
+ private List listenerContainerList = new ArrayList();
+
+ private Object mux = new Object();
- private static final Class CLASS_NAME = JmsInputChannel.class;
-
- private transient final CountDownLatch msgHandlerLatch = new CountDownLatch(1);
-
- private transient final CountDownLatch controllerLatch = new CountDownLatch(1);
- // Reference to the first Message Handler in the Chain.
- private transient Handler handler;
- // The name of the queue this Listener is expecting to receive messages from
- private String endpointName;
- // Reference to the Controller Object
- private transient AnalysisEngineController controller;
-
- private int sessionAckMode;
-
- private transient UimaDefaultMessageListenerContainer messageListener;
-
- private transient Session jmsSession;
-
- private String brokerURL="";
-
- private ServiceInfo serviceInfo = null;
-
- private volatile boolean stopped = false;
- private volatile boolean channelRegistered = false;
-
- private List listenerContainerList = new ArrayList();
-
- private Object mux = new Object();
-
- private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap =
- new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
-
- public AnalysisEngineController getController()
- {
- return controller;
- }
-
- public String getName()
- {
- return endpointName;
- }
-
- public void setController(AnalysisEngineController aController) throws Exception
- {
-
- this.controller = aController;
- if ( !channelRegistered )
- {
- controller.addInputChannel(this);
- }
- controller.setInputChannel(this);
- controllerLatch.countDown();
- }
-
- public void setMessageHandler( Handler aHandler )
- {
- handler = aHandler;
- msgHandlerLatch.countDown();
- }
-
- public void setEndpointName( String anEndpointName )
- {
- endpointName = anEndpointName;
+ private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap = new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
+
+ public AnalysisEngineController getController() {
+ return controller;
+ }
+
+ public String getName() {
+ return endpointName;
+ }
+
+ public void setController(AnalysisEngineController aController) throws Exception {
+
+ this.controller = aController;
+ if (!channelRegistered) {
+ controller.addInputChannel(this);
+ }
+ controller.setInputChannel(this);
+ controllerLatch.countDown();
+ }
+
+ public void setMessageHandler(Handler aHandler) {
+ handler = aHandler;
+ msgHandlerLatch.countDown();
+ }
+
+ public void setEndpointName(String anEndpointName) {
+ endpointName = anEndpointName;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "setEndpointName", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_service_listening__INFO",
- new Object[] { anEndpointName });
-
- }
- }
-
- /**
- * Validate message type contained in the JMS header.
- *
- * @param aMessage - jms message retrieved from queue
- * @param properties - map containing message properties
- * @return
- * @throws Exception
- */
-
- private boolean validMessageType( Message aMessage, Map properties ) throws Exception
- {
- if ( properties.containsKey(AsynchAEMessage.MessageType))
- {
- int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- if ( msgType != AsynchAEMessage.Response && msgType != AsynchAEMessage.Request )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_msgtype_in_message__INFO",
- new Object[] { msgType, endpointName });
- }
- return false;
-
- }
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msgtype_notin_message__INFO",
- new Object[] { endpointName });
- }
- return false;
- }
-
- return true;
- }
-
- private boolean isProcessRequest( Message aMessage ) throws Exception
- {
- Map properties = ((ActiveMQMessage)aMessage).getProperties();
- if ( properties.containsKey(AsynchAEMessage.MessageType) &&
- properties.containsKey(AsynchAEMessage.Command) )
- {
- int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
-
- if (msgType != AsynchAEMessage.Request || command != AsynchAEMessage.Process )
- {
- return false;
- }
- return true;
- }
- return false;
- }
- private boolean isRemoteRequest( Message aMessage ) throws Exception
- {
-
- // Dont do checkpoints if a message was sent from a Cas Multiplier
- if ( aMessage.propertyExists(AsynchAEMessage.CasSequence))
- {
- return false;
- }
-
- Map properties = ((ActiveMQMessage)aMessage).getProperties();
- if ( properties.containsKey(AsynchAEMessage.MessageType) &&
- properties.containsKey(AsynchAEMessage.Command) &&
- properties.containsKey(UIMAMessage.ServerURI))
- {
- int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
- boolean isRemote = aMessage.getStringProperty(UIMAMessage.ServerURI).startsWith("vm") == false;
- if ( isRemote && msgType == AsynchAEMessage.Request &&
- (command == AsynchAEMessage.Process ||
- command == AsynchAEMessage.CollectionProcessComplete) )
- {
- return true;
- }
- }
- return false;
- }
-
- private boolean acceptsDeltaCas( Message aMessage ) throws Exception
- {
- Map properties = ((ActiveMQMessage)aMessage).getProperties();
- boolean acceptsDeltaCas = false;
- if ( properties.containsKey(AsynchAEMessage.AcceptsDeltaCas) )
- {
- acceptsDeltaCas = aMessage.getBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
- }
- return acceptsDeltaCas;
- }
-
- /**
- * Validate command contained in the header of the JMS Message
- *
- * @param aMessage - JMS Message received
- * @param properties - Map containing header properties
- * @return - true if the command received is a valid one, false otherwise
- * @throws Exception
- */
- private boolean validCommand( Message aMessage, Map properties ) throws Exception
- {
- if ( properties.containsKey(AsynchAEMessage.Command))
- {
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
- if ( command != AsynchAEMessage.Process &&
- command != AsynchAEMessage.GetMeta &&
- command != AsynchAEMessage.ReleaseCAS &&
- command != AsynchAEMessage.Stop &&
- command != AsynchAEMessage.Ping &&
- command != AsynchAEMessage.ServiceInfo &&
- command != AsynchAEMessage.CollectionProcessComplete )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_command_in_message__INFO",
- new Object[] { command, endpointName });
- }
- return false;
- }
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_command_notin_message__INFO",
- new Object[] { endpointName });
- }
- return false;
- }
-
- return true;
- }
- /**
- * Validates payload in the JMS Message.
- *
- * @param aMessage - JMS Message received
- * @param properties - Map containing header properties
- * @return - true if the payload is valid, false otherwise
- * @throws Exception
- */
- private boolean validPayload( Message aMessage, Map properties ) throws Exception
- {
- if ( properties.containsKey(AsynchAEMessage.Command))
- {
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
- if ( command == AsynchAEMessage.GetMeta ||
- command == AsynchAEMessage.CollectionProcessComplete ||
- command == AsynchAEMessage.Stop ||
- command == AsynchAEMessage.Ping ||
- command == AsynchAEMessage.ServiceInfo ||
- command == AsynchAEMessage.ReleaseCAS)
- {
- // Payload not included in GetMeta Request
- return true;
- }
- }
-
- if ( properties.containsKey(AsynchAEMessage.Payload))
- {
- int payload = aMessage.getIntProperty(AsynchAEMessage.Payload);
- if ( payload != AsynchAEMessage.XMIPayload &&
- payload != AsynchAEMessage.BinaryPayload &&
- payload != AsynchAEMessage.CASRefID &&
- payload != AsynchAEMessage.Exception &&
- payload != AsynchAEMessage.Metadata
- )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_payload_in_message__INFO",
- new Object[] { payload, endpointName });
- }
-
- return false;
- }
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_payload_notin_message__INFO",
- new Object[] { endpointName });
- }
- return false;
- }
-
- return true;
- }
-
- private boolean isStaleMessage( Message aMessage ) throws JMSException
- {
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
- int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- if ( isStopped() || getController() == null || getController().getInProcessCache() == null )
- {
- // Shutting down
- return true;
- }
- if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
- {
- String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
- if (!getController().getInProcessCache().entryExists(casReferenceId))
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "isStaleMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stale_message__FINE",
- new Object[] { endpointName, casReferenceId, aMessage.getStringProperty(AsynchAEMessage.MessageFrom) });
- }
- return true;
- }
- }
- return false;
- }
- /**
- * Validates contents of the message. It checks if command, payload and message types contain
- * valid data.
- *
- * @param aMessage - JMS Message to validate
- * @return - true if message is valid, false otherwise
- * @throws Exception
- */
- public boolean validMessage( Message aMessage ) throws Exception
- {
- if ( aMessage instanceof ActiveMQMessage )
- {
- Map properties = ((ActiveMQMessage)aMessage).getProperties();
- if ( !validMessageType(aMessage, properties) )
- {
- int msgType = 0;
- if ( properties.containsKey(AsynchAEMessage.MessageType))
- {
- msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- }
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_msg_type__INFO",
- new Object[] { getController().getComponentName(), msgType });
- }
- return false;
- }
- if ( !validCommand(aMessage, properties) )
- {
+ "setEndpointName", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_service_listening__INFO", new Object[] { anEndpointName });
+
+ }
+ }
+
+ /**
+ * Validate message type contained in the JMS header.
+ *
+ * @param aMessage
+ * - jms message retrieved from queue
+ * @param properties
+ * - map containing message properties
+ * @return
+ * @throws Exception
+ */
+
+ private boolean validMessageType(Message aMessage, Map properties) throws Exception {
+ if (properties.containsKey(AsynchAEMessage.MessageType)) {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ if (msgType != AsynchAEMessage.Response && msgType != AsynchAEMessage.Request) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_msgtype_in_message__INFO",
+ new Object[] { msgType, endpointName });
+ }
+ return false;
+
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "validMessageType", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_msgtype_notin_message__INFO", new Object[] { endpointName });
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean isProcessRequest(Message aMessage) throws Exception {
+ Map properties = ((ActiveMQMessage) aMessage).getProperties();
+ if (properties.containsKey(AsynchAEMessage.MessageType)
+ && properties.containsKey(AsynchAEMessage.Command)) {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+
+ if (msgType != AsynchAEMessage.Request || command != AsynchAEMessage.Process) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isRemoteRequest(Message aMessage) throws Exception {
+
+ // Dont do checkpoints if a message was sent from a Cas Multiplier
+ if (aMessage.propertyExists(AsynchAEMessage.CasSequence)) {
+ return false;
+ }
+
+ Map properties = ((ActiveMQMessage) aMessage).getProperties();
+ if (properties.containsKey(AsynchAEMessage.MessageType)
+ && properties.containsKey(AsynchAEMessage.Command)
+ && properties.containsKey(UIMAMessage.ServerURI)) {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ boolean isRemote = aMessage.getStringProperty(UIMAMessage.ServerURI).startsWith("vm") == false;
+ if (isRemote
+ && msgType == AsynchAEMessage.Request
+ && (command == AsynchAEMessage.Process || command == AsynchAEMessage.CollectionProcessComplete)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean acceptsDeltaCas(Message aMessage) throws Exception {
+ Map properties = ((ActiveMQMessage) aMessage).getProperties();
+ boolean acceptsDeltaCas = false;
+ if (properties.containsKey(AsynchAEMessage.AcceptsDeltaCas)) {
+ acceptsDeltaCas = aMessage.getBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+ }
+ return acceptsDeltaCas;
+ }
+
+ /**
+ * Validate command contained in the header of the JMS Message
+ *
+ * @param aMessage
+ * - JMS Message received
+ * @param properties
+ * - Map containing header properties
+ * @return - true if the command received is a valid one, false otherwise
+ * @throws Exception
+ */
+ private boolean validCommand(Message aMessage, Map properties) throws Exception {
+ if (properties.containsKey(AsynchAEMessage.Command)) {
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ if (command != AsynchAEMessage.Process && command != AsynchAEMessage.GetMeta
+ && command != AsynchAEMessage.ReleaseCAS && command != AsynchAEMessage.Stop
+ && command != AsynchAEMessage.Ping && command != AsynchAEMessage.ServiceInfo
+ && command != AsynchAEMessage.CollectionProcessComplete) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "validCommand", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_command_in_message__INFO",
+ new Object[] { command, endpointName });
+ }
+ return false;
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "validCommand",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_command_notin_message__INFO",
+ new Object[] { endpointName });
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Validates payload in the JMS Message.
+ *
+ * @param aMessage
+ * - JMS Message received
+ * @param properties
+ * - Map containing header properties
+ * @return - true if the payload is valid, false otherwise
+ * @throws Exception
+ */
+ private boolean validPayload(Message aMessage, Map properties) throws Exception {
+ if (properties.containsKey(AsynchAEMessage.Command)) {
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ if (command == AsynchAEMessage.GetMeta
+ || command == AsynchAEMessage.CollectionProcessComplete
+ || command == AsynchAEMessage.Stop || command == AsynchAEMessage.Ping
+ || command == AsynchAEMessage.ServiceInfo || command == AsynchAEMessage.ReleaseCAS) {
+ // Payload not included in GetMeta Request
+ return true;
+ }
+ }
+
+ if (properties.containsKey(AsynchAEMessage.Payload)) {
+ int payload = aMessage.getIntProperty(AsynchAEMessage.Payload);
+ if (payload != AsynchAEMessage.XMIPayload && payload != AsynchAEMessage.BinaryPayload
+ && payload != AsynchAEMessage.CASRefID && payload != AsynchAEMessage.Exception
+ && payload != AsynchAEMessage.Metadata) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "validPayload", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_payload_in_message__INFO",
+ new Object[] { payload, endpointName });
+ }
+
+ return false;
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "validPayload",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_payload_notin_message__INFO",
+ new Object[] { endpointName });
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean isStaleMessage(Message aMessage) throws JMSException {
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ if (isStopped() || getController() == null || getController().getInProcessCache() == null) {
+ // Shutting down
+ return true;
+ }
+ if (command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response) {
+ String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
+ if (!getController().getInProcessCache().entryExists(casReferenceId)) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "isStaleMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stale_message__FINE",
+ new Object[] { endpointName, casReferenceId,
+ aMessage.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Validates contents of the message. It checks if command, payload and message types contain
+ * valid data.
+ *
+ * @param aMessage
+ * - JMS Message to validate
+ * @return - true if message is valid, false otherwise
+ * @throws Exception
+ */
+ public boolean validMessage(Message aMessage) throws Exception {
+ if (aMessage instanceof ActiveMQMessage) {
+ Map properties = ((ActiveMQMessage) aMessage).getProperties();
+ if (!validMessageType(aMessage, properties)) {
+ int msgType = 0;
+ if (properties.containsKey(AsynchAEMessage.MessageType)) {
+ msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_msg_type__INFO",
+ new Object[] { getController().getComponentName(), msgType });
+ }
+ return false;
+ }
+ if (!validCommand(aMessage, properties)) {
int command = 0;
- if ( properties.containsKey(AsynchAEMessage.Command))
- {
+ if (properties.containsKey(AsynchAEMessage.Command)) {
command = aMessage.getIntProperty(AsynchAEMessage.Command);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_cmd_type__INFO",
- new Object[] { getController().getComponentName(), command });
+ "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_cmd_type__INFO",
+ new Object[] { getController().getComponentName(), command });
}
- return false;
- }
- if ( !validPayload(aMessage, properties) )
- {
+ return false;
+ }
+ if (!validPayload(aMessage, properties)) {
int payload = 0;
- if ( properties.containsKey(AsynchAEMessage.Payload))
- {
+ if (properties.containsKey(AsynchAEMessage.Payload)) {
payload = aMessage.getIntProperty(AsynchAEMessage.Payload);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_payload_type__INFO",
- new Object[] { getController().getComponentName(), payload });
+ "validMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_invalid_payload_type__INFO",
+ new Object[] { getController().getComponentName(), payload });
}
- return false;
- }
-
- if ( isStaleMessage(aMessage) )
- {
- if ( sessionAckMode == Session.CLIENT_ACKNOWLEDGE)
- {
- aMessage.acknowledge();
- }
- return false;
- }
- }
- return true;
- }
- public void abort()
- {
- }
- private String decodeIntToString( String aTypeToDecode, int aValueToDecode )
- {
- if ( AsynchAEMessage.MessageType.equals(aTypeToDecode) )
- {
- switch( aValueToDecode )
- {
- case AsynchAEMessage.Request:
- return "Request";
- case AsynchAEMessage.Response:
- return "Response";
- }
- }
- else if ( AsynchAEMessage.Command.equals(aTypeToDecode ) )
- {
- switch( aValueToDecode )
- {
- case AsynchAEMessage.Process:
- return "Process";
- case AsynchAEMessage.GetMeta:
- return "GetMetadata";
- case AsynchAEMessage.CollectionProcessComplete:
- return "CollectionProcessComplete";
- case AsynchAEMessage.ReleaseCAS:
- return "ReleaseCAS";
- case AsynchAEMessage.Stop:
- return "Stop";
- case AsynchAEMessage.Ping:
- return "Ping";
- case AsynchAEMessage.ServiceInfo:
- return "ServiceInfo";
- }
-
- }
- else if ( AsynchAEMessage.Payload.equals(aTypeToDecode ) )
- {
- switch( aValueToDecode )
- {
- case AsynchAEMessage.XMIPayload:
- return "XMIPayload";
- case AsynchAEMessage.BinaryPayload:
- return "BinaryPayload";
- case AsynchAEMessage.CASRefID:
- return "CASRefID";
- case AsynchAEMessage.Metadata:
- return "Metadata";
- case AsynchAEMessage.Exception:
- return "Exception";
- case AsynchAEMessage.XCASPayload:
- return "XCASPayload";
- case AsynchAEMessage.None:
- return "None";
- }
- }
- return "UNKNOWN";
- }
-
- private boolean ackMessageNow( Message aMessage ) throws JMSException
- {
- if ( sessionAckMode != Session.CLIENT_ACKNOWLEDGE )
- {
- return false;
- }
-
-
- if ( aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.GetMeta ||
- aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.CollectionProcessComplete ||
- aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.ReleaseCAS ||
- aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.ACK )
- {
- return true;
- }
- return false;
- }
-
- private boolean isCheckpointWorthy( Message aMessage ) throws Exception
- {
- synchronized( mux )
- {
- // Dont do checkpoints if a message was sent from a Cas Multiplier
- if ( aMessage.propertyExists(AsynchAEMessage.CasSequence))
- {
- return false;
- }
- Map properties = ((ActiveMQMessage)aMessage).getProperties();
- if ( properties.containsKey(AsynchAEMessage.MessageType) &&
- properties.containsKey(AsynchAEMessage.Command) &&
- properties.containsKey(UIMAMessage.ServerURI))
- {
- int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
- int command = aMessage.getIntProperty(AsynchAEMessage.Command);
- if ( msgType == AsynchAEMessage.Request &&
- (command == AsynchAEMessage.Process ||
- command == AsynchAEMessage.CollectionProcessComplete) )
- {
- return true;
- }
- }
- return false;
-
- }
- }
- /**
- * Receives Messages from the JMS Provider. It checks the message header
- * to determine the type of message received. Based on the type,
- * a MessageContext is created to facilitate access to the transport specific message.
- * Once the MessageContext is determined this routine delegates handling
- * of the message to the chain of MessageHandlers.
- *
- * @param aMessage - JMS Message containing header and payload
- * @param aSession - JMSSession object
- */
- public void onMessage(Message aMessage, Session aJmsSession )
- {
- String casRefId = null;
-
- if ( isStopped() )
- {
- return;
- }
-
-
- try
- {
- // wait until message handlers are plugged in
- msgHandlerLatch.await();
- }
- catch( InterruptedException e) {}
- try
- {
- // wait until the controller is plugged in
- controllerLatch.await();
- }
- catch( InterruptedException e) {}
- long idleTime = 0;
-
-
-
- boolean doCheckpoint = false;
-
- String eN = endpointName;
- if ( getController() != null )
- {
- eN = getController().getComponentName();
- if (eN == null )
- {
- eN = "";
- }
- }
-
+ return false;
+ }
+
+ if (isStaleMessage(aMessage)) {
+ if (sessionAckMode == Session.CLIENT_ACKNOWLEDGE) {
+ aMessage.acknowledge();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void abort() {
+ }
+
+ private String decodeIntToString(String aTypeToDecode, int aValueToDecode) {
+ if (AsynchAEMessage.MessageType.equals(aTypeToDecode)) {
+ switch (aValueToDecode) {
+ case AsynchAEMessage.Request:
+ return "Request";
+ case AsynchAEMessage.Response:
+ return "Response";
+ }
+ } else if (AsynchAEMessage.Command.equals(aTypeToDecode)) {
+ switch (aValueToDecode) {
+ case AsynchAEMessage.Process:
+ return "Process";
+ case AsynchAEMessage.GetMeta:
+ return "GetMetadata";
+ case AsynchAEMessage.CollectionProcessComplete:
+ return "CollectionProcessComplete";
+ case AsynchAEMessage.ReleaseCAS:
+ return "ReleaseCAS";
+ case AsynchAEMessage.Stop:
+ return "Stop";
+ case AsynchAEMessage.Ping:
+ return "Ping";
+ case AsynchAEMessage.ServiceInfo:
+ return "ServiceInfo";
+ }
+
+ } else if (AsynchAEMessage.Payload.equals(aTypeToDecode)) {
+ switch (aValueToDecode) {
+ case AsynchAEMessage.XMIPayload:
+ return "XMIPayload";
+ case AsynchAEMessage.BinaryPayload:
+ return "BinaryPayload";
+ case AsynchAEMessage.CASRefID:
+ return "CASRefID";
+ case AsynchAEMessage.Metadata:
+ return "Metadata";
+ case AsynchAEMessage.Exception:
+ return "Exception";
+ case AsynchAEMessage.XCASPayload:
+ return "XCASPayload";
+ case AsynchAEMessage.None:
+ return "None";
+ }
+ }
+ return "UNKNOWN";
+ }
+
+ private boolean ackMessageNow(Message aMessage) throws JMSException {
+ if (sessionAckMode != Session.CLIENT_ACKNOWLEDGE) {
+ return false;
+ }
+
+ if (aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.GetMeta
+ || aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.CollectionProcessComplete
+ || aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.ReleaseCAS
+ || aMessage.getIntProperty(AsynchAEMessage.Command) == AsynchAEMessage.ACK) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isCheckpointWorthy(Message aMessage) throws Exception {
+ synchronized (mux) {
+ // Dont do checkpoints if a message was sent from a Cas Multiplier
+ if (aMessage.propertyExists(AsynchAEMessage.CasSequence)) {
+ return false;
+ }
+ Map properties = ((ActiveMQMessage) aMessage).getProperties();
+ if (properties.containsKey(AsynchAEMessage.MessageType)
+ && properties.containsKey(AsynchAEMessage.Command)
+ && properties.containsKey(UIMAMessage.ServerURI)) {
+ int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
+ int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+ if (msgType == AsynchAEMessage.Request
+ && (command == AsynchAEMessage.Process || command == AsynchAEMessage.CollectionProcessComplete)) {
+ return true;
+ }
+ }
+ return false;
+
+ }
+ }
+
+ /**
+ * Receives Messages from the JMS Provider. It checks the message header to determine the type of
+ * message received. Based on the type, a MessageContext is created to facilitate access to the
+ * transport specific message. Once the MessageContext is determined this routine delegates
+ * handling of the message to the chain of MessageHandlers.
+ *
+ * @param aMessage
+ * - JMS Message containing header and payload
+ * @param aSession
+ * - JMSSession object
+ */
+ public void onMessage(Message aMessage, Session aJmsSession) {
+ String casRefId = null;
+
+ if (isStopped()) {
+ return;
+ }
+
+ try {
+ // wait until message handlers are plugged in
+ msgHandlerLatch.await();
+ } catch (InterruptedException e) {
+ }
+ try {
+ // wait until the controller is plugged in
+ controllerLatch.await();
+ } catch (InterruptedException e) {
+ }
+ long idleTime = 0;
+
+ boolean doCheckpoint = false;
+
+ String eN = endpointName;
+ if (getController() != null) {
+ eN = getController().getComponentName();
+ if (eN == null) {
+ eN = "";
+ }
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_msg__FINE",
- new Object[] { eN });
- }
- JmsMessageContext messageContext = null;
-
- int requestType = 0;
- try
- {
- // Wrap JMS Message in MessageContext
- messageContext = new JmsMessageContext( aMessage, endpointName );
- if ( aMessage.getStringProperty(AsynchAEMessage.CasReference) == null )
- {
- casRefId = "CasReferenceId Not In Message";
- }
- else
- {
- casRefId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
- }
- if ( validMessage(aMessage) )
- {
- String command = decodeIntToString(AsynchAEMessage.Command, aMessage.getIntProperty(AsynchAEMessage.Command) );
- // Request or Response
- String messageType = decodeIntToString(AsynchAEMessage.MessageType, aMessage.getIntProperty(AsynchAEMessage.MessageType) );
- // Check if the request message contains a reply destination known to have
- // been deleted. This could be the case if the client has been shutdown after
- // sending requests. Previous attempt to deliver a reply failed and the client's
- // reply endpoint was placed on the DoNotProcess List. For optimization, all
- // requests from a dead client will be dropped.
- if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request &&
- messageContext != null &&
- messageContext.getEndpoint() != null &&
- messageContext.getEndpoint().getDestination() != null &&
- getController().isEndpointOnDontProcessList(messageContext.getEndpoint().getDestination().toString())) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_msg__FINE", new Object[] { eN });
+ }
+ JmsMessageContext messageContext = null;
+
+ int requestType = 0;
+ try {
+ // Wrap JMS Message in MessageContext
+ messageContext = new JmsMessageContext(aMessage, endpointName);
+ if (aMessage.getStringProperty(AsynchAEMessage.CasReference) == null) {
+ casRefId = "CasReferenceId Not In Message";
+ } else {
+ casRefId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
+ }
+ if (validMessage(aMessage)) {
+ String command = decodeIntToString(AsynchAEMessage.Command, aMessage
+ .getIntProperty(AsynchAEMessage.Command));
+ // Request or Response
+ String messageType = decodeIntToString(AsynchAEMessage.MessageType, aMessage
+ .getIntProperty(AsynchAEMessage.MessageType));
+ // Check if the request message contains a reply destination known to have
+ // been deleted. This could be the case if the client has been shutdown after
+ // sending requests. Previous attempt to deliver a reply failed and the client's
+ // reply endpoint was placed on the DoNotProcess List. For optimization, all
+ // requests from a dead client will be dropped.
+ if (aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
+ && messageContext != null
+ && messageContext.getEndpoint() != null
+ && messageContext.getEndpoint().getDestination() != null
+ && getController().isEndpointOnDontProcessList(
+ messageContext.getEndpoint().getDestination().toString())) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dropping_msg_client_is_dead__INFO",
- new Object[] { controller.getComponentName(),messageContext.getEndpoint().getDestination(), casRefId });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_dropping_msg_client_is_dead__INFO",
+ new Object[] { controller.getComponentName(),
+ messageContext.getEndpoint().getDestination(), casRefId });
}
return;
}
-
- if ( ackMessageNow(aMessage))
- {
- aMessage.acknowledge();
- }
- String msgSentFromIP = null;
-
- if ( aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response &&
- aMessage.propertyExists(AsynchAEMessage.ServerIP))
- {
- msgSentFromIP = aMessage.getStringProperty(AsynchAEMessage.ServerIP);
- }
-// System.out.println("***********************************************************************************" +
-// " \n**CONTROLLER::"+controller.getName()+"**** Received New Message From [ "+aMessage.getStringProperty(AsynchAEMessage.MessageFrom)+" ]**************" +
-// " \n**MSGTYPE::"+messageType+" COMMAND:"+command + " Cas Reference Id::"+casRefId+
-// " \n******************************************************************************");
-
- String msgFrom = (String)aMessage.getStringProperty(AsynchAEMessage.MessageFrom);
-
- if ( controller != null && msgFrom != null )
- {
- if ( msgSentFromIP != null )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_new_message_with_ip__FINE",
- new Object[] { controller.getComponentName(), msgFrom, msgSentFromIP, messageType, command, casRefId });
- }
- }
- else
- {
+ if (ackMessageNow(aMessage)) {
+ aMessage.acknowledge();
+ }
+ String msgSentFromIP = null;
+
+ if (aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Response
+ && aMessage.propertyExists(AsynchAEMessage.ServerIP)) {
+ msgSentFromIP = aMessage.getStringProperty(AsynchAEMessage.ServerIP);
+ }
+ // System.out.println("***********************************************************************************"
+ // +
+ // " \n**CONTROLLER::"+controller.getName()+"**** Received New Message From [ "+aMessage.getStringProperty(AsynchAEMessage.MessageFrom)+" ]**************"
+ // +
+ // " \n**MSGTYPE::"+messageType+" COMMAND:"+command +
+ // " Cas Reference Id::"+casRefId+
+ // " \n******************************************************************************");
+
+ String msgFrom = (String) aMessage.getStringProperty(AsynchAEMessage.MessageFrom);
+
+ if (controller != null && msgFrom != null) {
+ if (msgSentFromIP != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_recvd_new_message_with_ip__FINE",
+ new Object[] { controller.getComponentName(), msgFrom, msgSentFromIP,
+ messageType, command, casRefId });
+ }
+ } else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_new_message__FINE",
- new Object[] { controller.getComponentName(), msgFrom, messageType, command, casRefId });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_recvd_new_message__FINE",
+ new Object[] { controller.getComponentName(), msgFrom, messageType, command,
+ casRefId });
}
- }
- }
- else
- {
- }
- // Delegate processing of the message contained in the MessageContext to the
- // chain of handlers
- try
- {
- if ( isRemoteRequest( aMessage ))
- {
- // Compute the idle time waiting for this request
- idleTime = getController().getIdleTime();
-
- // This idle time is reported to the client thus save it in the endpoint
- // object. This value will be fetched and added to the outgoing reply.
- messageContext.getEndpoint().setIdleTime(idleTime);
- }
- }
- catch( Exception e ) {}
-
- // Determine if this message is a request and either GetMeta, CPC, or Process
- doCheckpoint = isCheckpointWorthy( aMessage );
- requestType = aMessage.getIntProperty(AsynchAEMessage.Command);
- // Checkpoint
- if ( doCheckpoint )
- {
- getController().beginProcess(requestType);
- }
+ }
+ } else {
+ }
+ // Delegate processing of the message contained in the MessageContext to the
+ // chain of handlers
+ try {
+ if (isRemoteRequest(aMessage)) {
+ // Compute the idle time waiting for this request
+ idleTime = getController().getIdleTime();
+
+ // This idle time is reported to the client thus save it in the endpoint
+ // object. This value will be fetched and added to the outgoing reply.
+ messageContext.getEndpoint().setIdleTime(idleTime);
+ }
+ } catch (Exception e) {
+ }
+
+ // Determine if this message is a request and either GetMeta, CPC, or Process
+ doCheckpoint = isCheckpointWorthy(aMessage);
+ requestType = aMessage.getIntProperty(AsynchAEMessage.Command);
+ // Checkpoint
+ if (doCheckpoint) {
+ getController().beginProcess(requestType);
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_new_msg_in__FINEST",
- new Object[] { getController().getComponentName(), msgFrom, command, messageType, casRefId });
- }
- if ( handler != null )
- {
- handler.handle( messageContext );
- }
- }
- else
- {
- if ( !isStaleMessage(aMessage))
- {
- controller.getErrorHandlerChain().handle(new InvalidMessageException(), HandlerBase.populateErrorContext( messageContext ), controller);
- }
- }
-
- }
- catch( Throwable t)
- {
- t.printStackTrace();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_new_msg_in__FINEST",
+ new Object[] { getController().getComponentName(), msgFrom, command, messageType,
+ casRefId });
+ }
+ if (handler != null) {
+ handler.handle(messageContext);
+ }
+ } else {
+ if (!isStaleMessage(aMessage)) {
+ controller.getErrorHandlerChain().handle(new InvalidMessageException(),
+ HandlerBase.populateErrorContext(messageContext), controller);
+ }
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", t);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", t);
+ }
+ controller.getErrorHandlerChain().handle(t, HandlerBase.populateErrorContext(messageContext),
+ controller);
+ } finally {
+ // Call the end checkpoint for non-aggregates. For primitives the CAS has been fully processed
+ // if we are here
+ if (doCheckpoint && getController() instanceof PrimitiveAnalysisEngineController) {
+ getController().endProcess(requestType);
}
- controller.getErrorHandlerChain().handle(t, HandlerBase.populateErrorContext( messageContext ), controller);
- }
- finally
- {
- // Call the end checkpoint for non-aggregates. For primitives the CAS has been fully processed if we are here
- if ( doCheckpoint && getController() instanceof PrimitiveAnalysisEngineController )
- {
- getController().endProcess(requestType);
- }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msg_processed__FINE",
- new Object[] { getController().getComponentName(), casRefId });
- }
- }
- }
-
- public int getSessionAckMode()
- {
- return sessionAckMode;
- }
- public String getServerUri()
- {
- return brokerURL;
- }
- public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer messageListener)
- {
- this.messageListener = messageListener;
- System.setProperty("BrokerURI", messageListener.getBrokerUrl());
- brokerURL = messageListener.getBrokerUrl();
- if ( !listenerContainerList.contains(messageListener) ) {
- listenerContainerList.add(messageListener);
- }
- if ( getController() != null )
- {
- try
- {
- getController().addInputChannel(this);
- messageListener.setController(getController());
- } catch( Exception e) {}
- }
- }
- public ActiveMQConnectionFactory getConnectionFactory()
- {
- if (messageListener == null )
- {
- return null;
- }
- else
- {
- return (ActiveMQConnectionFactory)messageListener.getConnectionFactory();
- }
- }
- public void ackMessage( MessageContext aMessageContext )
- {
- if ( aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE )
- {
- try
- {
- ((Message)aMessageContext.getRawMessage()).acknowledge();
- }
- catch( Exception e)
- {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_msg_processed__FINE",
+ new Object[] { getController().getComponentName(), casRefId });
+ }
+ }
+ }
+
+ public int getSessionAckMode() {
+ return sessionAckMode;
+ }
+
+ public String getServerUri() {
+ return brokerURL;
+ }
+
+ public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer messageListener) {
+ this.messageListener = messageListener;
+ System.setProperty("BrokerURI", messageListener.getBrokerUrl());
+ brokerURL = messageListener.getBrokerUrl();
+ if (!listenerContainerList.contains(messageListener)) {
+ listenerContainerList.add(messageListener);
+ }
+ if (getController() != null) {
+ try {
+ getController().addInputChannel(this);
+ messageListener.setController(getController());
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public ActiveMQConnectionFactory getConnectionFactory() {
+ if (messageListener == null) {
+ return null;
+ } else {
+ return (ActiveMQConnectionFactory) messageListener.getConnectionFactory();
+ }
+ }
+
+ public void ackMessage(MessageContext aMessageContext) {
+ if (aMessageContext != null && sessionAckMode == Session.CLIENT_ACKNOWLEDGE) {
+ try {
+ ((Message) aMessageContext.getRawMessage()).acknowledge();
+ } catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "ackMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
- new Object[] { Thread.currentThread().getName(), e });
+ "ackMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
+ new Object[] { Thread.currentThread().getName(), e });
}
- }
- }
- }
- public String getBrokerURL()
- {
- return brokerURL;
- }
-
- public String getInputQueueName()
- {
- if ( messageListener != null )
- if ( messageListener.getDestination() != null ) {
- return messageListener.getDestination().toString();
- } else {
- return messageListener.getDestinationName();//getEndpointName();
- }
- else
- {
- return "";
- }
- }
- public ServiceInfo getServiceInfo()
- {
- if ( serviceInfo == null )
- {
- serviceInfo = new ServiceInfo();
- serviceInfo.setBrokerURL(getBrokerURL());
- serviceInfo.setInputQueueName(getName());
- serviceInfo.setState("Active");
- }
- return serviceInfo;
- }
- public void setServerUri(String serverUri)
- {
- brokerURL = serverUri;
- if ( getController() != null && getController() instanceof AggregateAnalysisEngineController )
- {
- ((AggregateAnalysisEngineController)getController()).getServiceInfo().setBrokerURL(brokerURL);
- }
- else
- {
- ((PrimitiveAnalysisEngineController)getController()).getServiceInfo().setBrokerURL(brokerURL);
- }
- }
+ }
+ }
+ }
+
+ public String getBrokerURL() {
+ return brokerURL;
+ }
+
+ public String getInputQueueName() {
+ if (messageListener != null)
+ if (messageListener.getDestination() != null) {
+ return messageListener.getDestination().toString();
+ } else {
+ return messageListener.getDestinationName();// getEndpointName();
+ }
+ else {
+ return "";
+ }
+ }
+
+ public ServiceInfo getServiceInfo() {
+ if (serviceInfo == null) {
+ serviceInfo = new ServiceInfo();
+ serviceInfo.setBrokerURL(getBrokerURL());
+ serviceInfo.setInputQueueName(getName());
+ serviceInfo.setState("Active");
+ }
+ return serviceInfo;
+ }
+
+ public void setServerUri(String serverUri) {
+ brokerURL = serverUri;
+ if (getController() != null && getController() instanceof AggregateAnalysisEngineController) {
+ ((AggregateAnalysisEngineController) getController()).getServiceInfo()
+ .setBrokerURL(brokerURL);
+ } else {
+ ((PrimitiveAnalysisEngineController) getController()).getServiceInfo()
+ .setBrokerURL(brokerURL);
+ }
+ }
+
private void stopChannel(UimaDefaultMessageListenerContainer mL) throws Exception {
String eName = mL.getEndpointName();
if (eName != null) {
@@ -831,49 +764,52 @@
}
}
mL.stop();
-
+
String selector = "";
- if ( mL.getMessageSelector() != null ) {
- selector = " Selector:"+mL.getMessageSelector();
+ if (mL.getMessageSelector() != null) {
+ selector = " Selector:" + mL.getMessageSelector();
}
- System.out.println("Service:"+getController().getComponentName()+" Message Channel:"+mL.getDestination()+selector+" Stopped");
+ System.out.println("Service:" + getController().getComponentName() + " Message Channel:"
+ + mL.getDestination() + selector + " Stopped");
}
+
private boolean doCloseChannel(UimaDefaultMessageListenerContainer mL, int channelsToClose) {
- // Check if we are closing just the input channel
- if ( channelsToClose == InputChannel.InputChannels ) {
- // Fetch the listener object
- ActiveMQDestination destination = (ActiveMQDestination)mL.getListenerEndpoint();
- // if this is a listener on a temp queue return false. We need to keep all temp
- // channels open to receive replies and/or notifications to free CASes
- // Keep the temp reply channel open
- if ( destination != null && destination.isTemporary() ) {
+ // Check if we are closing just the input channel
+ if (channelsToClose == InputChannel.InputChannels) {
+ // Fetch the listener object
+ ActiveMQDestination destination = (ActiveMQDestination) mL.getListenerEndpoint();
+ // if this is a listener on a temp queue return false. We need to keep all temp
+ // channels open to receive replies and/or notifications to free CASes
+ // Keep the temp reply channel open
+ if (destination != null && destination.isTemporary()) {
return false;
}
}
return true;
}
+
public void stop() throws Exception {
- stop(InputChannel.CloseAllChannels );
+ stop(InputChannel.CloseAllChannels);
listenerContainerList.clear();
failedListenerMap.clear();
}
- public synchronized void stop(int channelsToClose) throws Exception {
-
- List<UimaDefaultMessageListenerContainer> listenersToRemove =
- new ArrayList<UimaDefaultMessageListenerContainer>();
- for( Object listenerObject : listenerContainerList) {
- final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer)listenerObject;
- if (mL != null && mL.isRunning() && doCloseChannel(mL, channelsToClose) ) {
+
+ public synchronized void stop(int channelsToClose) throws Exception {
+
+ List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
+ for (Object listenerObject : listenerContainerList) {
+ final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer) listenerObject;
+ if (mL != null && mL.isRunning() && doCloseChannel(mL, channelsToClose)) {
stopChannel(mL);
- // Just in case check if the container still in the list. If so, add it to
- // another list that container listeners that have been stopped and need
- // to be removed from the listenerContainerList. Removing the listener from
- // the listenerContainerList in this iterator loop is not working. If for
- // example the iterator has two elements, after the first remove from the
- // listenerContainerList, the iterator stops event though there is still
- // one element left. Process removal of listeners outside of the iterator
- // loop
- if ( listenerContainerList.contains(mL) ) {
+ // Just in case check if the container still in the list. If so, add it to
+ // another list that container listeners that have been stopped and need
+ // to be removed from the listenerContainerList. Removing the listener from
+ // the listenerContainerList in this iterator loop is not working. If for
+ // example the iterator has two elements, after the first remove from the
+ // listenerContainerList, the iterator stops event though there is still
+ // one element left. Process removal of listeners outside of the iterator
+ // loop
+ if (listenerContainerList.contains(mL)) {
listenersToRemove.add(mL);
}
} else {
@@ -885,56 +821,58 @@
}
}
}
- }
- // Remove listeners from the listenerContainerList
- for( UimaDefaultMessageListenerContainer mL : listenersToRemove) {
- listenerContainerList.remove(mL);
- }
- listenersToRemove.clear();
- if ( channelsToClose == InputChannel.CloseAllChannels ) {
+ }
+ // Remove listeners from the listenerContainerList
+ for (UimaDefaultMessageListenerContainer mL : listenersToRemove) {
+ listenerContainerList.remove(mL);
+ }
+ listenersToRemove.clear();
+ if (channelsToClose == InputChannel.CloseAllChannels) {
stopped = true;
}
- }
- public boolean isStopped()
- {
- return stopped;
- }
- public int getConcurrentConsumerCount()
- {
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public int getConcurrentConsumerCount() {
return messageListener.getConcurrentConsumers();
}
private void testIfBrokerRunning(String aBrokerUrl) throws Exception {
ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(aBrokerUrl);
- // Try to create a test connection to make sure that the broker is available
+ // Try to create a test connection to make sure that the broker is available
Connection testConnection = null;
try {
testConnection = f.createConnection();
- } catch( Exception e ) {
+ } catch (Exception e) {
throw e;
} finally {
- if ( testConnection != null ) {
- // close test connection. Broker is running
+ if (testConnection != null) {
+ // close test connection. Broker is running
testConnection.close();
}
}
}
- public void createListener( String aDelegateKey ) throws Exception {
- if ( getController() instanceof AggregateAnalysisEngineController ) {
- Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(aDelegateKey);
- if ( delegate != null ) {
- UimaDefaultMessageListenerContainer newListener =
- new UimaDefaultMessageListenerContainer();
+ public void createListener(String aDelegateKey) throws Exception {
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ Delegate delegate = ((AggregateAnalysisEngineController) getController())
+ .lookupDelegate(aDelegateKey);
+ if (delegate != null) {
+
+ UimaDefaultMessageListenerContainer newListener = new UimaDefaultMessageListenerContainer();
testIfBrokerRunning(delegate.getEndpoint().getServerURI());
- ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(delegate.getEndpoint().getServerURI());
+ ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(delegate.getEndpoint()
+ .getServerURI());
newListener.setConnectionFactory(f);
newListener.setMessageListener(this);
newListener.setController(getController());
-
+
TempDestinationResolver resolver = new TempDestinationResolver();
- resolver.setConnectionFactory(f);
+ resolver.setConnectionFactory(f);
resolver.setListener(newListener);
newListener.setDestinationResolver(resolver);
@@ -947,119 +885,139 @@
newListener.setTaskExecutor(executor);
newListener.initialize();
newListener.start();
- // Wait until the resolver plugs in the destination
+ // Wait until the resolver plugs in the destination
while (newListener.getDestination() == null) {
- synchronized( newListener ) {
-
- System.out.println(".... Waiting For Spring Resolver to Create New Temp Reply Queue ...");
+ synchronized (newListener) {
+
+ System.out
+ .println(".... Waiting For Spring Resolver to Create New Temp Reply Queue ...");
newListener.wait(100);
}
}
newListener.afterPropertiesSet();
- // Get the endpoint object for a given delegate key from the Aggregate
- Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
- // Override the reply destination.
+ // Get the endpoint object for a given delegate key from the Aggregate
+ Endpoint endpoint = ((AggregateAnalysisEngineController) getController()).lookUpEndpoint(
+ aDelegateKey, false);
+ // Override the reply destination.
endpoint.setDestination(newListener.getDestination());
Object clone = ((Endpoint_impl) endpoint).clone();
- newListener.setTargetEndpoint((Endpoint)clone);
+ newListener.setTargetEndpoint((Endpoint) clone);
endpoint.setStatus(Endpoint.OK);
}
}
}
+
/**
* Given an endpoint name returns all listeners attached to this endpoint. There can be multiple
- * listeners on an endpoint each with a different selector to receive targeted messages like GetMeta
- * and Process.
+ * listeners on an endpoint each with a different selector to receive targeted messages like
+ * GetMeta and Process.
*
- * @param anEndpointName - name of the endpoint that is used to find associated listener(s)
+ * @param anEndpointName
+ * - name of the endpoint that is used to find associated listener(s)
*
* @return - list of listeners
*/
- private UimaDefaultMessageListenerContainer[] getListenersForEndpoint( String anEndpointName ) {
+ private UimaDefaultMessageListenerContainer[] getListenersForEndpoint(String anEndpointName) {
List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
- for( int i=0; i < listenerContainerList.size(); i++ ) {
- UimaDefaultMessageListenerContainer mListener =
- (UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
- if ( mListener.getDestinationName() != null && mListener.getDestinationName().equals( anEndpointName)) {
+ for (int i = 0; i < listenerContainerList.size(); i++) {
+ UimaDefaultMessageListenerContainer mListener = (UimaDefaultMessageListenerContainer) listenerContainerList
+ .get(i);
+ if (mListener.getDestinationName() != null
+ && mListener.getDestinationName().equals(anEndpointName)) {
listeners.add(mListener);
- } else if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
+ } else if (mListener.getDestination() != null
+ && mListener.getDestination().toString().equals(anEndpointName)) {
listeners.add(mListener);
}
}
- if ( listeners.size() > 0 ) {
- UimaDefaultMessageListenerContainer[] listenerArray = new UimaDefaultMessageListenerContainer[listeners.size()];
+ if (listeners.size() > 0) {
+ UimaDefaultMessageListenerContainer[] listenerArray = new UimaDefaultMessageListenerContainer[listeners
+ .size()];
listeners.toArray(listenerArray);
return listenerArray;
}
return null;
}
+
/**
*
*/
- public void destroyListener( final String anEndpointName, String aDelegateKey ) {
- // check if delegate listener has already been placed in the failed listeners list
- // If so, nothing else to do here
- if ( failedListenerMap.containsKey(aDelegateKey) ){
+ public void destroyListener(final String anEndpointName, String aDelegateKey) {
+ // check if delegate listener has already been placed in the failed listeners list
+ // If so, nothing else to do here
+ if (failedListenerMap.containsKey(aDelegateKey)) {
return;
}
- // Fetch all associated listeners.
+ // Fetch all associated listeners.
final UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
- if ( mListeners == null ) {
+ if (mListeners == null) {
return;
}
- // Stop each listener
- for( final UimaDefaultMessageListenerContainer mListener : mListeners ) {
- if ( !mListener.isRunning() ) {
+ // Stop each listener
+ for (final UimaDefaultMessageListenerContainer mListener : mListeners) {
+ if (!mListener.isRunning()) {
continue; // Already Stopped
}
try {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)
+ && mListener.getDestination() != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
+ "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stop_listener__INFO",
new Object[] { mListener.getDestination().toString() });
}
- // Spin a thread that will stop the listener and wait for its shutdown
+ // Spin a thread that will stop the listener and wait for its shutdown
Thread stopThread = new Thread("InputChannelStopThread") {
public void run() {
mListener.stop();
- // wait until the listener shutsdown
- while( mListener.isRunning());
- System.out.println("Thread:"+Thread.currentThread().getId()+"++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) && mListener.getDestination() != null) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "destroyListener", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
- new Object[] {controller.getComponentName(), mListener.getDestination().toString() });
+ // wait until the listener shutsdown
+ while (mListener.isRunning())
+ ;
+ System.out.println("Thread:" + Thread.currentThread().getId()
+ + "++++ Listener on Queue:" + anEndpointName + " Has Been Stopped...");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)
+ && mListener.getDestination() != null) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "destroyListener",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stopped_listener_INFO",
+ new Object[] { controller.getComponentName(),
+ mListener.getDestination().toString() });
}
}
};
stopThread.start();
- if ( getController() != null ) {
- Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
+ if (getController() != null) {
+ Endpoint endpoint = ((AggregateAnalysisEngineController) getController()).lookUpEndpoint(
+ aDelegateKey, false);
endpoint.setStatus(Endpoint.FAILED);
- if ( mListener.getConnectionFactory() != null) {
- if ( getController() instanceof AggregateAnalysisEngineController ) {
- if ( !failedListenerMap.containsKey(aDelegateKey )) {
- failedListenerMap.put( aDelegateKey, mListener);
+ if (mListener.getConnectionFactory() != null) {
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ if (!failedListenerMap.containsKey(aDelegateKey)) {
+ failedListenerMap.put(aDelegateKey, mListener);
listenerContainerList.remove(mListener);
}
}
}
}
- //}
- } catch( Exception e) {
+ // }
+ } catch (Exception e) {
e.printStackTrace();
}
}
}
+
public boolean isFailed(String aDelegateKey) {
return failedListenerMap.containsKey(aDelegateKey);
}
- public boolean isListenerForDestination( String anEndpointName) {
- UimaDefaultMessageListenerContainer[] mListeners =
- getListenersForEndpoint(anEndpointName);
- if ( mListeners == null ) {
+
+ public boolean isListenerForDestination(String anEndpointName) {
+ UimaDefaultMessageListenerContainer[] mListeners = getListenersForEndpoint(anEndpointName);
+ if (mListeners == null) {
return false;
}
return true;
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannelMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannelMBean.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannelMBean.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannelMBean.java Wed Sep 2 15:16:23 2009
@@ -21,9 +21,8 @@
import java.io.Serializable;
-public interface JmsInputChannelMBean extends Serializable
-{
- public String getBrokerURL();
-
- public String getInputQueueName();
+public interface JmsInputChannelMBean extends Serializable {
+ public String getBrokerURL();
+
+ public String getInputQueueName();
}