You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/05/20 19:32:31 UTC
svn commit: r1744789 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
Author: cwiklik
Date: Fri May 20 19:32:31 2016
New Revision: 1744789
URL: http://svn.apache.org/viewvc?rev=1744789&view=rev
Log:
UIMA-4927 modified to use plain jms for pinging uima-as service
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java?rev=1744789&r1=1744788&r2=1744789&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/UimaAsPing.java Fri May 20 19:32:31 2016
@@ -23,22 +23,31 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.client.UimaASProcessStatus;
-import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
-import org.apache.uima.aae.client.UimaAsynchronousEngine;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
-import org.apache.uima.cas.CAS;
-import org.apache.uima.collection.EntityProcessStatus;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.ducc.common.IServiceStatistics;
import org.apache.uima.ducc.common.TcpStreamHandler;
import org.apache.uima.ducc.common.utils.DuccProperties;
-import org.apache.uima.resource.ResourceInitializationException;
-import org.apache.uima.util.Level;
public class UimaAsPing
extends AServicePing
@@ -62,6 +71,20 @@ public class UimaAsPing
String pid;
boolean gmfail = false;
+ private Connection connection;
+
+ private Session producerSession;
+
+ private MessageProducer producer;
+
+ private Session consumerSession;
+
+ private TemporaryQueue consumerDestination;
+
+ private String brokerURI;
+
+ private MessageConsumer consumer;
+
public UimaAsPing()
{
}
@@ -89,13 +112,7 @@ public class UimaAsPing
}
broker_host = url.getHost();
// not needed here fyi broker_port = url.getPort();
-
-
- UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
- UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
- // there are a couple junky messages that slip by the above configurations. turn the whole danged thing off.
- UIMAFramework.getLogger().setLevel(Level.OFF);
-
+
if ( args == null ) {
meta_timeout = 5000;
broker_jmx_port = 1099;
@@ -115,19 +132,53 @@ public class UimaAsPing
}
meta_timeout = props.getIntProperty ("meta-timeout" , 5000);
String broker_tmp_jmx = props.getProperty ("broker-jmx-port");
+
+
+
if ( broker_tmp_jmx.equals("none") ) {
broker_jmx_port = -1;
this.monitor = null;
} else {
broker_jmx_port = props.getIntProperty("broker-jmx-port", 1099);
+ doLog("init","Initializing UimaAsServiceMonitor: endpoint:"+endpoint+" broker_host:"+broker_host+" broker_jmx_port:"+broker_jmx_port);
this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
}
}
}
+ private void initJMS(String brokerURI ) throws JMSException {
+ String methodName = "initJMS";
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+ this.brokerURI = brokerURI;
+ connection = factory.createConnection();
+ connection.start();
+ doLog(methodName, "Connection started");
+
+ producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue producerQueue = producerSession.createQueue(endpoint);
+ producer = producerSession.createProducer(producerQueue);
+ consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumerDestination = consumerSession.createTemporaryQueue();
+ consumer = consumerSession.createConsumer(consumerDestination);
+ doLog(methodName, "Created queues and sessions");
+ }
public void stop()
- {
+ { try {
+ if (producerSession != null) {
+ producerSession.close();
+ }
+ if (consumerSession != null) {
+ consumerSession.close();
+ }
+ if ( connection != null ) {
+ connection.close();
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+
+
if ( monitor != null ) monitor.stop();
}
@@ -147,7 +198,8 @@ public class UimaAsPing
stats.setHealthy(true); // this pinger defines 'healthy' as
// 'service responds to get-meta and broker returns jmx stats'
} catch ( Throwable t ) {
- stats.setHealthy(false);
+ doLog("evaluateService", "EXCEPTION::::"+ExceptionUtils.getStackTrace(t));
+ stats.setHealthy(false);
monitor.setJmxFailure(t.getMessage());
}
}
@@ -163,6 +215,8 @@ public class UimaAsPing
public IServiceStatistics getStatistics()
{
String methodName = "getStatistics";
+ doLog(methodName, "***********************************************");
+
IServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
String failure_reason = null;
@@ -170,37 +224,93 @@ public class UimaAsPing
pid = "N/A";
evaluateService(statistics); // if we get here, the get-meta worked well enough
-
- // Instantiate Uima AS Client
- BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
- UimaCbListener listener = new UimaCbListener();
- uimaAsEngine.addStatusCallbackListener(listener);
- Map<String, Object> appCtx = new HashMap<String, Object>();
- appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
- appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
- appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, meta_timeout);
- appCtx.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, "1000");
-
- ResourceInitializationException excp = null;
+ ExecutorService executor = null;
+ Exception excp = null;
gmfail = false;
+ Future<Boolean> future = null;
try {
- uimaAsEngine.initialize(appCtx);
+ initJMS(broker);
+
+ TextMessage msg = producerSession.createTextMessage();
+ msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
+ msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
+ msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
+ msg.setJMSReplyTo(consumerDestination);
+ msg.setText("");
+
+ doLog(methodName, "Sending getMeta request to " + endpoint + " at " + brokerURI);
+ producer.send(msg);
+ long startTime = System.currentTimeMillis();
+ executor = Executors.newSingleThreadExecutor();
+ future = executor.submit(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ // First receive() is to get IP and PID of the process that will process getMeta
+ ActiveMQTextMessage serviceInfoReply = (ActiveMQTextMessage) consumer.receive();
+ nodeIp = serviceInfoReply.getStringProperty(AsynchAEMessage.ServerIP);
+ pid = serviceInfoReply.getStringProperty(AsynchAEMessage.UimaASProcessPID);
+ // second receive() is for GetMeta reply
+ // dont need to process the actual reply. If receive() succeeds we have
+ // a good GetMeta call
+ consumer.receive();
+ return true;
+ }
+ }
+ );
+ // wait for getMeta reply and timeout if not received within allotted window
+ future.get(meta_timeout, TimeUnit.MILLISECONDS);
+ future.cancel(true);
+ long replyTime = System.currentTimeMillis() - startTime;
statistics.setAlive(true);
statistics.setHealthy(true && statistics.isHealthy());
- listener.ok();
- } catch( ResourceInitializationException e ) {
+
+ statistics.setInfo("Get-meta took " + replyTime + " msecs.");
+ doLog(methodName, "Reply received in ", replyTime, " ms");
+ gmfail = false;
+ } catch ( ExecutionException e) {
+ excp = e;
+ gmfail = true;
+ statistics.setHealthy(false);
+ statistics.setAlive(false);
+ statistics.setInfo("Ping error: " + e);
+ doLog(methodName, null, "Error while awaiting getmeta reply from ", nodeIp, "PID", pid);
+ if ( future != null ) {
+ future.cancel(true);
+ }
+ } catch ( InterruptedException e) {
excp = e;
- listener.timeout();
+ gmfail = true;
+ statistics.setHealthy(false);
+ statistics.setAlive(false);
+ statistics.setInfo("Ping error: " + e);
+ doLog(methodName, null, "Thread interrupted while waiting for getmeta reply from ", nodeIp, "PID", pid);
+ if ( future != null ) {
+ future.cancel(true);
+ }
+ } catch( TimeoutException e) {
+ excp = e;
+ gmfail = true;
statistics.setHealthy(false);
statistics.setAlive(false);
+ statistics.setInfo("Ping error: " + e);
+ doLog(methodName, null, "Get-Meta timeout ("+meta_timeout+" ms) from ", nodeIp, "PID", pid);
+ if ( future != null ) {
+ future.cancel(true);
+ }
+
+ } catch (JMSException e) {
+ excp = e;
+ gmfail = true;
+ statistics.setHealthy(false);
+ statistics.setAlive(false);
+ statistics.setInfo("Ping error: " + e);
+ //e.printStackTrace();
} finally {
- try {
- uimaAsEngine.stop();
- } catch (Throwable e) {
- doLog(methodName, "Exception on UIMA-AS connection stop: " + e.toString());
- }
+ stop();
+ if ( executor != null ) {
+ executor.shutdownNow();
+ }
}
-
if ( gmfail || excp != null ) {
failure_reason = "Cannot issue getMeta to: " + endpoint + ":" + broker;
if ( excp != null ) {
@@ -226,57 +336,4 @@ public class UimaAsPing
return statistics;
}
-
- class UimaCbListener extends UimaAsBaseCallbackListener
- {
- public UimaCbListener()
- {
- }
-
- public void ok()
- {
- // String methodName = "UimaAsPing:get-meta";
- gmfail = false;
- }
-
- public void timeout()
- {
- String methodName = "UimaAsPing:get-meta";
- doLog(methodName, null, "Get-Meta timeout from ", nodeIp, "PID", pid);
- gmfail = true;
- }
-
- public void onBeforeMessageSend(UimaASProcessStatus status)
- {
- }
-
-// private void onBeforeMessageSendHandler(UimaASProcessStatus status)
-// {
-// }
-
- public void onBeforeProcessMeta(String IP, String p)
- {
- String methodName = "UimaAsPing:onBeforeProcessMeta";
- doLog(methodName, null, "Get-Meta received from ", IP, ":", p, "for", ep);
- pid = p;
- nodeIp = IP;
- }
-
-// private void onBeforeProcessCASHandler(UimaASProcessStatus status, String nodeIP, String pid)
-// {
-// }
-
- public void initializationComplete(EntityProcessStatus aStatus)
- {
- }
-
- public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus)
- {
- }
-
- public void collectionProcessComplete(EntityProcessStatus aStatus)
- {
- }
- }
-
}