You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2013/04/18 16:50:00 UTC
svn commit: r1469356 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator:
ORTracer.java maintenance/MaintenanceThread.java maintenance/MqReaper.java
Author: degenaro
Date: Thu Apr 18 14:50:00 2013
New Revision: 1469356
URL: http://svn.apache.org/r1469356
Log:
UIMA-2819 DUCC orchestrator (OR) re-factor reaper for re-usability & employ broker.name correctly
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java?rev=1469356&r1=1469355&r2=1469356&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java Thu Apr 18 14:50:00 2013
@@ -39,6 +39,6 @@ public class ORTracer implements Process
@Override
public void process(Exchange arg0) throws Exception {
String location = "process";
- logger.info(location, jobid, name);
+ logger.debug(location, jobid, name);
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java?rev=1469356&r1=1469355&r2=1469356&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java Thu Apr 18 14:50:00 2013
@@ -44,7 +44,6 @@ public class MaintenanceThread extends T
private StateManager stateManager = StateManager.getInstance();
private HealthMonitor healthMonitor = HealthMonitor.getInstance();
- private MqReaper mqReaper = MqReaper.getInstance();
private long minMillis = 1000;
private long wakeUpMillis = 2*60*1000;
@@ -102,7 +101,14 @@ public class MaintenanceThread extends T
if(isTime()) {
stateManager.prune(workMap);
healthMonitor.ajudicate();
- mqReaper.removeUnusedJdQueues(workMap);
+ try {
+ MqReaper mqReaper = MqReaper.getInstance();
+ mqReaper.removeUnusedJdQueues(workMap);
+ }
+ catch(Exception e) {
+ MqReaper.resetInstance();
+ }
+
}
}
catch(Throwable t) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java?rev=1469356&r1=1469355&r2=1469356&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java Thu Apr 18 14:50:00 2013
@@ -19,124 +19,99 @@
package org.apache.uima.ducc.orchestrator.maintenance;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.util.ArrayList;
-import java.util.Hashtable;
import java.util.Iterator;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.uima.ducc.common.exception.DuccConfigurationException;
+import org.apache.uima.ducc.common.mq.MqHelper;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesHelper;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
-
public class MqReaper {
private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(MqReaper.class.getName());
+
+ private static DuccId duccId = null;
+
+ private MqHelper mqHelper = null;
- private static MqReaper mqReaper = new MqReaper();
+ private DuccPropertiesResolver duccPropertiesResolver = null;
- public static MqReaper getInstance() {
- return mqReaper;
- }
+ private String jd_queue_prefix = null;
+ private String jd_queue_prefix_default = "ducc.jd.queue.";
- private String ducc_broker_name = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_broker_hostname);
- private String ducc_broker_jmx_port = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_broker_jmx_port);
- private String ducc_broker_url = "service:jmx:rmi:///jndi/rmi://"+ducc_broker_name+":"+ducc_broker_jmx_port+"/jmxrmi";
- private String ducc_jd_queue_prefix = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_jd_queue_prefix);
- private String objectName = "org.apache.activemq:BrokerName="+ducc_broker_name+",Type=Broker";
+ private static MqReaper instance = null;
- private JMXServiceURL url;
- private JMXConnector jmxc;
- private MBeanServerConnection conn;
- private ObjectName activeMQ;
- private BrokerViewMBean mbean;
+ public static MqReaper getInstance() throws MalformedObjectNameException, DuccConfigurationException, IOException {
+ if(instance == null) {
+ instance = new MqReaper();
+ }
+ return instance;
+ }
- private boolean mqConnected = false;
+ public static void resetInstance() {
+ instance = null;
+ }
- public MqReaper() {
+ public MqReaper() throws MalformedObjectNameException, DuccConfigurationException, IOException {
+ resolve();
init();
}
- private void init() {
+ private void init() throws MalformedObjectNameException, DuccConfigurationException, IOException {
String location = "init";
- if(ducc_broker_name == null) {
- ducc_broker_name = "localhost";
- }
- if(ducc_broker_jmx_port == null) {
- ducc_broker_jmx_port = "1099";
- }
- if(ducc_jd_queue_prefix == null) {
- ducc_broker_jmx_port = "ducc.jd.queue.";
- }
- logger.info(location,null,DuccPropertiesResolver.ducc_broker_name+":"+ducc_broker_name);
- logger.info(location,null,DuccPropertiesResolver.ducc_broker_jmx_port+":"+ducc_broker_jmx_port);
- logger.info(location,null,DuccPropertiesResolver.ducc_broker_url+":"+ducc_broker_url);
- logger.info(location,null,DuccPropertiesResolver.ducc_jd_queue_prefix+":"+ducc_jd_queue_prefix);
- logger.info(location,null,"objectName"+":"+objectName);
- }
-
- private boolean mqConnect() {
- String location = "mqConnect";
- if(!mqConnected) {
- try {
- url = new JMXServiceURL(ducc_broker_url);
- }
- catch (MalformedURLException e) {
- logger.error(location, null, e);
- return mqConnected;
- }
- try {
- jmxc = JMXConnectorFactory.connect(url);
- }
- catch (IOException e) {
- logger.error(location, null, e);
- return mqConnected;
- }
- try {
- conn = jmxc.getMBeanServerConnection();
- }
- catch (IOException e) {
- logger.error(location, null, e);
- return mqConnected;
- }
- try {
- activeMQ = new ObjectName(objectName);
- }
- catch (MalformedObjectNameException e) {
- logger.error(location, null, e);
- return mqConnected;
- }
- catch (NullPointerException e) {
- logger.error(location, null, e);
- return mqConnected;
- }
- mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, activeMQ, BrokerViewMBean.class, true);
- mqConnected = true;
- }
- return mqConnected;
- }
-
- private boolean isEqual(String a, String b) {
- boolean retVal = false;
- if(a != null) {
- if(b != null) {
- if(a.equals(b)) {
- retVal = true;
- }
+ try {
+ mqHelper = new MqHelper();
+ } catch (MalformedObjectNameException e) {
+ logger.error(location,duccId,e);
+ throw e;
+ } catch (NullPointerException e) {
+ logger.error(location,duccId,e);
+ throw e;
+ } catch (DuccConfigurationException e) {
+ logger.error(location,duccId,e);
+ throw e;
+ } catch (IOException e) {
+ logger.error(location,duccId,e);
+ throw e;
+ }
+ logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_hostname+":"+mqHelper.get_broker_hostname());
+ logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_jmx_port+":"+mqHelper.get_broker_jmx_port());
+ logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_name+":"+mqHelper.get_broker_name());
+ logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_url+":"+mqHelper.get_broker_url());
+ }
+
+ private void resolve() {
+ String location = "resolve";
+ jd_queue_prefix = getDuccProperty(DuccPropertiesResolver.ducc_jd_queue_prefix, jd_queue_prefix_default);
+ logger.info(location,duccId,DuccPropertiesResolver.ducc_jd_queue_prefix+":"+jd_queue_prefix);
+ }
+
+ private void configure() {
+ duccPropertiesResolver = DuccPropertiesHelper.configure();
+ }
+
+ private String getDuccProperty(String key, String defaultValue) {
+ if(duccPropertiesResolver == null) {
+ configure();
+ }
+ String value = duccPropertiesResolver.getFileProperty(key);
+ if(value == null) {
+ value = defaultValue;
+ }
+ else {
+ value = value.trim();
+ if(value.length() == 0) {
+ value = defaultValue;
}
}
- return retVal;
+ return value;
}
private boolean isStartsWith(String a, String b) {
@@ -151,77 +126,75 @@ public class MqReaper {
return retVal;
}
- public ArrayList<String> getJdQueues() {
+ private ArrayList<String> getJdQueues() {
String location = "getJdQueues";
- ArrayList<String> jdQueues = new ArrayList<String>();
- if(mqConnect()) {
- try {
- ObjectName[] queues = mbean.getQueues();
- for( ObjectName queue : queues ) {
- Hashtable<String, String> propertyTable = queue.getKeyPropertyList();
- if(propertyTable != null) {
- String type = propertyTable.get("Type");
- String destination = propertyTable.get("Destination");
- if(isEqual(type, "Queue")) {
- if(isStartsWith(destination, ducc_jd_queue_prefix)) {
- logger.trace(location, null, "consider:"+destination);
- jdQueues.add(destination);
- }
- else {
- logger.trace(location, null, "skip:"+destination);
- }
- }
- else {
- logger.trace(location, null, "type:"+type+" "+"destination:"+destination);
- }
- }
- else {
- logger.trace(location, null, "propertyTable:"+propertyTable);
- }
- }
+ ArrayList<String> qList = mqHelper.getQueueList();
+ ArrayList<String> jdQueueList = new ArrayList<String>();
+ for(String qName : qList) {
+ if(isStartsWith(qName,jd_queue_prefix)) {
+ jdQueueList.add(qName);
}
- catch(Throwable t) {
- logger.trace(location, null, t);
+ else {
+ logger.debug(location, duccId, "queue ignore:"+qName);
}
}
- return jdQueues;
+ return jdQueueList;
}
- public void removeUnusedJdQueues(DuccWorkMap workMap) {
+ private String lastMessage = "";
+ private long lastTime = System.currentTimeMillis();
+
+ private void pruningStatus(int size, int removed) {
+ String location = "pruningStatus";
+ String message = "size:"+size+" "+"removed:"+removed;
+ long time = System.currentTimeMillis();
+ if(message.equals(lastMessage)) {
+ long elapsed = time - lastTime;
+ if(elapsed > 1000*60*60) {
+ logger.info(location, duccId, message);
+ lastTime = time;
+ }
+ }
+ else {
+ logger.info(location, duccId, message);
+ lastMessage = message;
+ lastTime = time;
+ }
+ }
+
+ public int removeUnusedJdQueues(DuccWorkMap workMap) {
String location = "removeUnusedJdQueues";
+ int removed = 0;
try {
- ArrayList<String> queues = getJdQueues();
+ ArrayList<String> qList = getJdQueues();
Iterator<DuccId> iterator = workMap.getJobKeySet().iterator();
while( iterator.hasNext() ) {
DuccId jobId = iterator.next();
- String jqKeep = ducc_jd_queue_prefix+jobId.getFriendly();
- if(queues.remove(jqKeep)) {
- logger.debug(location, null, "queue keep:"+jqKeep);
+ String jqKeep = jd_queue_prefix+jobId.getFriendly();
+ if(qList.remove(jqKeep)) {
+ logger.debug(location, duccId, "queue keep:"+jqKeep);
}
else {
- logger.trace(location, null, "queue not found:"+jqKeep);
+ logger.trace(location, duccId, "queue not found:"+jqKeep);
}
}
- for( String jqDiscard : queues ) {
- logger.info(location, null, "queue discard:"+jqDiscard);
+ for( String qName : qList ) {
+ logger.info(location, duccId, "queue discard:"+qName);
try {
- mbean.removeQueue(jqDiscard);
+ mqHelper.removeQueue(qName);
+ removed++;
}
catch(Throwable t) {
- logger.error(location, null, t);
- mqConnected = false;
+ logger.error(location, duccId, t);
+ init();
}
}
+ pruningStatus(qList.size(),removed);
}
catch(Throwable t) {
- logger.error(location, null, t);
+ logger.error(location, duccId, t);
}
- }
-
- public static void main(String[] args) {
- MqReaper mqr = MqReaper.getInstance();
- DuccWorkMap workMap = new DuccWorkMap();
- mqr.removeUnusedJdQueues(workMap);
+ return removed;
}
}