You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/03/26 16:05:25 UTC

svn commit: r927899 - /incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java

Author: cwiklik
Date: Fri Mar 26 15:05:25 2010
New Revision: 927899

URL: http://svn.apache.org/viewvc?rev=927899&view=rev
Log:
UIMA-1726 Check remote broker for existence of reply queue to determine if a message should be processed. The check is done via JMX. If the broker is configured to run with no jmx, every message is processed and not subject to the optimization. 

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=927899&r1=927898&r2=927899&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Mar 26 15:05:25 2010
@@ -22,6 +22,7 @@ package org.apache.uima.adapter.jms.acti
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
@@ -47,6 +48,7 @@ import org.apache.uima.aae.delegate.Dele
 import org.apache.uima.aae.error.InvalidMessageException;
 import org.apache.uima.aae.handler.Handler;
 import org.apache.uima.aae.handler.HandlerBase;
+import org.apache.uima.aae.jmx.RemoteJMXServer;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
@@ -102,6 +104,10 @@ public class JmsInputChannel implements 
 
   private Object mux = new Object();
 
+  private RemoteJMXServer remoteJMXServer = null;
+  //  synchronizes initialization of RemotBroker
+  private Object brokerMux = new Object();
+  
   private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap = new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
 
   public AnalysisEngineController getController() {
@@ -478,7 +484,63 @@ public class JmsInputChannel implements 
 
     }
   }
-
+  /**
+   * Checks if the incoming request requires response.
+   * 
+   * @param aMessage - incoming message to check
+   * @return - true if reply is required, false otherwise
+   */
+  private boolean isReplyRequired(Message aMessage ) {
+    try {
+      int command = aMessage.getIntProperty(AsynchAEMessage.Command);
+      if (aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request &&
+          ( command == AsynchAEMessage.Process ||
+            command == AsynchAEMessage.GetMeta ||
+            command == AsynchAEMessage.CollectionProcessComplete ) ) {
+        return true;
+      }
+    } catch( Exception e) {
+      //  ignore
+    }
+    return false;
+  }
+  private boolean validEndpoint(JmsMessageContext messageContext) {
+    return (messageContext.getEndpoint() != null
+            //  Reply destination must be present
+            && messageContext.getEndpoint().getDestination() != null);
+  }
+  /**
+   * Determines if a given message should be processed or not. If the message
+   * requires response, the code checks if the client's temp reply queue still 
+   * exists. If so, the message must be processed. If the reply queue doesnt
+   * exist, the client must have terminated and there is no need to process
+   * the message. The code consults the broker (via JMX) to see if the queue
+   * exists.
+   * 
+   * @param aMessage
+   * @param messageContext
+   * @return
+   * @throws Exception
+   */
+  private boolean processRequestMessage(Message aMessage, JmsMessageContext messageContext) throws Exception {
+    //  check if we can reply to the client. The code uses connection to 
+    //  broker's JMX server to lookup client's temp reply queue. If the
+    //  queue does not exist the incoming message is dropped. No need to
+    //  waste cycles when we know that the client has terminated. 
+    if ( isReplyRequired(aMessage)
+            //  Exclude request messages from a Cas Multiplier. We are not replying
+            //  to CM. Only CM adds CasSequence to a request message
+            && !messageContext.propertyExists(AsynchAEMessage.CasSequence)
+            //  Reply destination must be present
+            && validEndpoint(messageContext) ) {
+      //  replace ':' with '_' to enable JMX query to work. ':' is an invalid char for queries
+      String queueName = remoteJMXServer.normalize(messageContext.getEndpoint().getDestination().toString());       
+      //  Check if reply queue provided in a message still exists in broker's
+      //  JMX Server registry. In case the server is not available the call returns true.
+      return remoteJMXServer.isClientReplyQueueAvailable(queueName);
+    }
+    return true; // Default, PROCESS THE MESSAGE
+  }
   /**
    * Receives Messages from the JMS Provider. It checks the message header to determine the type of
    * message received. Based on the type, a MessageContext is created to facilitate access to the
@@ -518,7 +580,38 @@ public class JmsInputChannel implements 
         eN = "";
       }
     }
-
+    
+    // The following creates remote connection to a JMX Server running in AMQ broker
+    // that manages this service input queue. The connection is done once and cached
+    //  for subsequent use. The default is to assume that the JMX Server is *not* 
+    //  available. In such case, the optimization to check for existence of reply
+    //  queue is not done and every message is processed.
+    boolean jmxServerAvailable = false;
+    //  relevant to top level service
+    if ( controller.isTopLevelComponent() && getBrokerURL() != null ) {
+      synchronized(brokerMux) {
+        //  check if the connection is valid. If not, create a new connection to
+        //  MBeanServer and cache it.
+        if (remoteJMXServer == null || !remoteJMXServer.isServerAvailable() ) {
+          //  create connection to this service Broker's JMX Server to enable queue lookups.
+          //  The lookup allows the service to determine if it should process a message. It
+          //  checks the server for existence of a reply queue provided in request msg. If 
+          //  the lookup fails, it means that the client sending a request msg has terminated
+          //  and a temp reply queue associated with the client has been deleted. There is 
+          //  no reason to process a msg if we know that the client is dead.
+          //  NOTE: the call to attachToRemotBrokerJMXServer() handles exceptions and does
+          //        not rethrow them. In case there was a problem connecting to the server
+          //        its internal status will say NotInitialized.
+          attachToRemoteBrokerJMXServer();
+          //  check if we failed in the above method.
+          if (remoteJMXServer != null && remoteJMXServer.isInitialized()) {
+            jmxServerAvailable = true; 
+          }
+        } else {
+          jmxServerAvailable = true;
+        }
+      }
+    }
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
               JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_recvd_msg__FINE", new Object[] { eN });
@@ -542,32 +635,34 @@ public class JmsInputChannel implements 
         // Request or Response
         messageType = decodeIntToString(AsynchAEMessage.MessageType, aMessage
                 .getIntProperty(AsynchAEMessage.MessageType));
-        // Check if the request message contains a reply destination known to have
-        // been deleted. This could be the case if the client has been shutdown after
-        // sending requests. Previous attempt to deliver a reply failed and the client's
-        // reply endpoint was placed on the DoNotProcess List. For optimization, all
-        // requests from a dead client will be dropped.
-        if (aMessage.getIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
-                && messageContext != null
-                && messageContext.getEndpoint() != null
-                && messageContext.getEndpoint().getDestination() != null
-                && getController().isEndpointOnDontProcessList(
-                        messageContext.getEndpoint().getDestination().toString())) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.INFO,
-                    CLASS_NAME.getName(),
-                    "onMessage",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_dropping_msg_client_is_dead__INFO",
-                    new Object[] { controller.getComponentName(),
-                        messageContext.getEndpoint().getDestination(), casRefId });
+        //  check if we should process this message. If there is a connection
+        //  to a remote MBeanServer we check for existence of a temp reply queue.
+        //  If the queue exists, the message is allowed to be processed. Otherwise,
+        //  the client has terminated taking down its temp reply queue. There is
+        //  no reason to process the message.
+        try {
+          if ( jmxServerAvailable && // check if we have valid connection to MBeanServer
+                  //  The following returns false if a reply queue does not exist
+                  //  in MBeanServer registry
+                  !processRequestMessage(aMessage, messageContext) ) {
+            //  Reply queue has been deleted
+            if ( validEndpoint(messageContext) &&
+                 UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                    UIMAFramework.getLogger(CLASS_NAME).logrb(
+                            Level.INFO,
+                            CLASS_NAME.getName(),
+                            "onMessage",
+                            JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                            "UIMAJMS_dropping_msg_client_is_dead__INFO",
+                            new Object[] { controller.getComponentName(),
+                              messageContext.getEndpoint().getDestination(), casRefId });
+            }
+            return;   // DROP the message because the client has terminated
           }
-          return;
-        }
-
-        if (ackMessageNow(aMessage)) {
-          aMessage.acknowledge();
+        } catch( Exception e) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                  "getLoadedJars", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAEE_exception__WARNING", e);
         }
         String msgSentFromIP = null;
 
@@ -678,7 +773,87 @@ public class JmsInputChannel implements 
       }
     }
   }
+  /**
+   * Connects to this service Broker's JMX Server. If unable to connect, this method
+   * fails silently. The method uses default JMX Port number 1099 to create a connection
+   * to the Broker's JMX MBean server. The default can be overridden via System 
+   * property 'activemq.broker.jmx.port'. If connection cannot be established the 
+   * method silently fails.
+   * 
+   */
+  private void attachToRemoteBrokerJMXServer() {
+    remoteJMXServer = new RemoteJMXServer();
+    //  Check if JMX server port number was explicitly defined on the command line
+    //  If not, use 1099 as a default.
+    String jmxPort = System.getProperty("activemq.broker.jmx.port");
+    if ( jmxPort == null || jmxPort.trim().length() == 0 ) {
+      jmxPort = "1099";  // default
+    }
+    //  Now check if the provided port is actually a number
+    try {
+      //  This is here to test provided port number. The converted value is ignored
+      Integer.parseInt(jmxPort);
+    } catch( NumberFormatException e ) {
+      jmxPort = "1099";   // default
+    }
+    //  Fetch AMQ jmx domain from system properties. This property is not required
+    //  and the default AMQ jmx is used. The only exception is when the service is
+    //  deployed in a jvm with multiple brokers deployed as it is the case with jUnit 
+    //  tests. In such a case, each broker will register self with JMX using a different
+    //  domain.
+    String jmxAMQDomain = System.getProperty("activemq.broker.jmx.domain");
+    if ( jmxAMQDomain == null ) {
+      jmxAMQDomain = "org.apache.activemq";    // default
+    }
+    String brokerHostname="";
+    try {
+      //  Using this service Broker URL, extract a hostname to enable JMX queries
+      if ( getBrokerURL().startsWith("failover")) {
+        //  extract list of URLs that are provided as "failover:(url,url,...,url)"
+        String brokerUrlList = getBrokerURL().
+                                substring(getBrokerURL().indexOf("(")+1, getBrokerURL().indexOf(")"));
+        if ( brokerUrlList != null ) {
+          //  Tokenize list using "," as delimiter
+          StringTokenizer tokenizer = new StringTokenizer(brokerUrlList, ",");
+          while(tokenizer.hasMoreTokens()) {
+            try {
+              //  parse the url to extract just the node name
+              brokerHostname = extractNodeName(tokenizer.nextToken().trim());
+              //  Connect to a remote JMX Server. This fails if connection attempt fails
+              //  In such case, we try another url from the list
+              remoteJMXServer.initialize(jmxAMQDomain, brokerHostname,jmxPort);
+              break;  // got the connection to a JMX server
+            } catch ( Exception e) {
+              //  silently fail, try another broker from the failover list
+            }
+          }
+        }
+        if ( remoteJMXServer != null && !remoteJMXServer.isInitialized()) {
+          remoteJMXServer = null;   // Not supported
+        }
+      } else if ( getBrokerURL().startsWith("tcp") || getBrokerURL().startsWith("http")) {
+        brokerHostname = extractNodeName(getBrokerURL());
+        //  Connect to a remote JMX Server
+        remoteJMXServer.initialize(jmxAMQDomain, brokerHostname,jmxPort);
+      }
+    } catch( Exception e) {
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                "ackMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
+                e);
+      }
+      remoteJMXServer = null;
+    }
+  }
 
+  private String extractNodeName(String url) {
+    int startPos = url.indexOf("//");
+    //  Strip the protocol
+    String temp = url.substring(startPos+2);
+    int endPos = temp.indexOf(":");
+    //  extract hostname from Broker URL
+    return temp.substring(0, endPos).trim();
+  }
   public int getSessionAckMode() {
     return sessionAckMode;
   }
@@ -814,6 +989,10 @@ public class JmsInputChannel implements 
     stop(InputChannel.CloseAllChannels);
     listenerContainerList.clear();
     failedListenerMap.clear();
+    if ( remoteJMXServer != null ) {
+      remoteJMXServer.disconnect();
+      remoteJMXServer = null;
+    }
   }
 
   public synchronized void stop(int channelsToClose) throws Exception {