You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/05/20 19:32:31 UTC

svn commit: r1744789 - /uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java

Author: cwiklik
Date: Fri May 20 19:32:31 2016
New Revision: 1744789

URL: http://svn.apache.org/viewvc?rev=1744789&view=rev
Log:
UIMA-4927 modified to use plain jms for pinging uima-as service

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java?rev=1744789&r1=1744788&r2=1744789&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java Fri May 20 19:32:31 2016
@@ -23,22 +23,31 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.client.UimaASProcessStatus;
-import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
-import org.apache.uima.aae.client.UimaAsynchronousEngine;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
-import org.apache.uima.cas.CAS;
-import org.apache.uima.collection.EntityProcessStatus;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.ducc.common.IServiceStatistics;
 import org.apache.uima.ducc.common.TcpStreamHandler;
 import org.apache.uima.ducc.common.utils.DuccProperties;
-import org.apache.uima.resource.ResourceInitializationException;
-import org.apache.uima.util.Level;
 
 public class UimaAsPing
     extends AServicePing
@@ -62,6 +71,20 @@ public class UimaAsPing
     String pid;
     boolean gmfail = false;
 
+    private Connection connection;
+
+    private Session producerSession;
+
+    private MessageProducer producer;
+
+    private Session consumerSession;
+
+    private TemporaryQueue consumerDestination;
+
+    private String brokerURI;
+
+    private MessageConsumer consumer;
+
     public UimaAsPing()
     {
     }
@@ -89,13 +112,7 @@ public class UimaAsPing
         }
         broker_host = url.getHost();
         // not needed here fyi broker_port = url.getPort();
-
-                
-        UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
-        UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
-        // there are a couple junky messages that slip by the above configurations.  turn the whole danged thing off.
-        UIMAFramework.getLogger().setLevel(Level.OFF);
-
+        
         if ( args == null ) {
             meta_timeout = 5000;
             broker_jmx_port = 1099;
@@ -115,19 +132,53 @@ public class UimaAsPing
             }
             meta_timeout          = props.getIntProperty    ("meta-timeout"   , 5000);
             String broker_tmp_jmx = props.getProperty       ("broker-jmx-port");
+
+            
+
             if ( broker_tmp_jmx.equals("none") ) {
                 broker_jmx_port = -1;
                 this.monitor = null;
             } else {
                 broker_jmx_port = props.getIntProperty("broker-jmx-port", 1099);
+                doLog("init","Initializing UimaAsServiceMonitor: endpoint:"+endpoint+" broker_host:"+broker_host+" broker_jmx_port:"+broker_jmx_port);
                 this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
             }
         }
 
     }
 
+    private void initJMS(String brokerURI ) throws JMSException {
+        String methodName = "initJMS";
+    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+    	this.brokerURI = brokerURI; 
+		connection = factory.createConnection();
+        connection.start();
+        doLog(methodName, "Connection started");
+
+        producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue producerQueue = producerSession.createQueue(endpoint);
+        producer = producerSession.createProducer(producerQueue);
+        consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerDestination = consumerSession.createTemporaryQueue();
+        consumer = consumerSession.createConsumer(consumerDestination);
+        doLog(methodName, "Created queues and sessions");
+    }
     public void stop()
-    {
+    { try {
+        if (producerSession != null) {
+            producerSession.close();
+        }
+        if (consumerSession != null) {
+            consumerSession.close();
+        }
+        if ( connection != null ) {
+        	connection.close();
+        }
+    } catch (JMSException e) {                                                                                                                                                                       
+        e.printStackTrace();
+    }
+
+    	
         if ( monitor != null ) monitor.stop();
     }
 
@@ -147,7 +198,8 @@ public class UimaAsPing
             stats.setHealthy(true);       // this pinger defines 'healthy' as
                                           // 'service responds to get-meta and broker returns jmx stats'
         } catch ( Throwable t ) {
-            stats.setHealthy(false);
+            doLog("evaluateService", "EXCEPTION::::"+ExceptionUtils.getStackTrace(t));
+        	stats.setHealthy(false);
             monitor.setJmxFailure(t.getMessage());
         }
     }
@@ -163,6 +215,8 @@ public class UimaAsPing
     public IServiceStatistics getStatistics()
     {
         String methodName = "getStatistics";
+        doLog(methodName, "***********************************************");
+
         IServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
         String failure_reason = null;
 
@@ -170,37 +224,93 @@ public class UimaAsPing
         pid = "N/A";
 
         evaluateService(statistics);       // if we get here, the get-meta worked well enough
-
-        // Instantiate Uima AS Client
-        BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
-        UimaCbListener listener = new UimaCbListener();
-        uimaAsEngine.addStatusCallbackListener(listener);
-        Map<String, Object> appCtx = new HashMap<String, Object>();
-        appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
-        appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
-        appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, meta_timeout);
-        appCtx.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
-
-        ResourceInitializationException excp = null;
+        ExecutorService executor = null;
+        Exception excp = null;
         gmfail = false;
+        Future<Boolean> future = null;
         try {
-            uimaAsEngine.initialize(appCtx);
+        	initJMS(broker);
+        	
+            TextMessage msg = producerSession.createTextMessage();
+            msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
+            msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+            msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+            msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
+            msg.setJMSReplyTo(consumerDestination);
+            msg.setText("");
+
+            doLog(methodName, "Sending getMeta request to " + endpoint + " at " + brokerURI);
+            producer.send(msg);
+            long startTime = System.currentTimeMillis();
+            executor = Executors.newSingleThreadExecutor();
+            future = executor.submit(new Callable<Boolean>() {
+                public Boolean call() throws Exception {
+                	// First receive() is to get IP and PID of the process that will process getMeta
+                    ActiveMQTextMessage serviceInfoReply = (ActiveMQTextMessage) consumer.receive();
+                    nodeIp = serviceInfoReply.getStringProperty(AsynchAEMessage.ServerIP); 
+        		    pid = serviceInfoReply.getStringProperty(AsynchAEMessage.UimaASProcessPID);
+                    // second receive() is for GetMeta reply
+        		    // dont need to process the actual reply. If receive() succeeds we have 
+        		    // a good GetMeta call
+        		    consumer.receive();
+                	return true;
+                }
+              }
+            );
+            // wait for getMeta reply and timeout if not received within allotted window
+            future.get(meta_timeout, TimeUnit.MILLISECONDS);
+            future.cancel(true);
+            long replyTime = System.currentTimeMillis() - startTime;
             statistics.setAlive(true);
             statistics.setHealthy(true && statistics.isHealthy());
-            listener.ok();
-        } catch( ResourceInitializationException e ) {
+
+            statistics.setInfo("Get-meta took " + replyTime + " msecs.");
+            doLog(methodName, "Reply received in ", replyTime, " ms");
+            gmfail = false;
+        } catch ( ExecutionException e) {
+            excp = e;
+            gmfail = true;
+            statistics.setHealthy(false);
+            statistics.setAlive(false);
+            statistics.setInfo("Ping error: " + e);
+            doLog(methodName, null, "Error while awaiting getmeta reply from ", nodeIp, "PID", pid);
+        	if ( future != null ) {
+        		future.cancel(true);
+        	}
+        } catch ( InterruptedException e) {
             excp = e;
-            listener.timeout();
+            gmfail = true;
+            statistics.setHealthy(false);
+            statistics.setAlive(false);
+            statistics.setInfo("Ping error: " + e);
+            doLog(methodName, null, "Thread interrupted while waiting for getmeta reply from ", nodeIp, "PID", pid);
+        	if ( future != null ) {
+        		future.cancel(true);
+        	}
+        } catch( TimeoutException e) {
+            excp = e;
+            gmfail = true;
             statistics.setHealthy(false);
             statistics.setAlive(false);
+            statistics.setInfo("Ping error: " + e);
+            doLog(methodName, null, "Get-Meta timeout ("+meta_timeout+" ms) from ", nodeIp, "PID", pid);
+        	if ( future != null ) {
+        		future.cancel(true);
+        	}
+        	
+        } catch (JMSException e) {
+            excp = e;
+            gmfail = true;
+            statistics.setHealthy(false);
+            statistics.setAlive(false);
+            statistics.setInfo("Ping error: " + e);
+            //e.printStackTrace();
         } finally {
-            try {
-				uimaAsEngine.stop();
-			} catch (Throwable e) {
-				doLog(methodName, "Exception on UIMA-AS connection stop: " + e.toString());
-			}
+        	stop();
+        	if ( executor != null ) {
+            	executor.shutdownNow();
+        	}
         }
-
         if ( gmfail || excp != null ) {
             failure_reason = "Cannot issue getMeta to: " + endpoint + ":" + broker; 
             if ( excp != null ) {
@@ -226,57 +336,4 @@ public class UimaAsPing
 
         return statistics;
     }
-
-    class UimaCbListener extends UimaAsBaseCallbackListener 
-    {
-        public UimaCbListener()
-        {
-        }
-
-        public void ok()
-        {
-            // String methodName = "UimaAsPing:get-meta";
-            gmfail = false;
-        }
-
-        public void timeout()
-        {
-            String methodName = "UimaAsPing:get-meta";
-            doLog(methodName, null, "Get-Meta timeout from ", nodeIp, "PID", pid);
-            gmfail = true;
-        }
-
-        public void onBeforeMessageSend(UimaASProcessStatus status) 
-        {
-        }
-	
-//        private void onBeforeMessageSendHandler(UimaASProcessStatus status) 
-//        {
-//        }
-	
-        public void onBeforeProcessMeta(String IP, String p)
-        {
-            String methodName = "UimaAsPing:onBeforeProcessMeta";
-            doLog(methodName, null, "Get-Meta received from ", IP, ":", p, "for", ep);
-            pid = p;
-            nodeIp = IP;
-        }
-	
-//        private void onBeforeProcessCASHandler(UimaASProcessStatus status, String nodeIP, String pid) 
-//        {
-//        }
-	
-        public void initializationComplete(EntityProcessStatus aStatus) 
-        {
-        }
-
-        public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) 
-        {
-        }
-
-        public void collectionProcessComplete(EntityProcessStatus aStatus) 
-        {
-        }
-    }
-
 }