You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2013/04/18 16:50:00 UTC

svn commit: r1469356 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator: ORTracer.java maintenance/MaintenanceThread.java maintenance/MqReaper.java

Author: degenaro
Date: Thu Apr 18 14:50:00 2013
New Revision: 1469356

URL: http://svn.apache.org/r1469356
Log:
UIMA-2819 DUCC orchestrator (OR) re-factor reaper for re-usability & employ broker.name correctly

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java?rev=1469356&r1=1469355&r2=1469356&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java Thu Apr 18 14:50:00 2013
@@ -39,6 +39,6 @@ public class ORTracer implements Process
 	@Override
 	public void process(Exchange arg0) throws Exception {
 		String location = "process";
-		logger.info(location, jobid, name);
+		logger.debug(location, jobid, name);
 	}
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java?rev=1469356&r1=1469355&r2=1469356&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java Thu Apr 18 14:50:00 2013
@@ -44,7 +44,6 @@ public class MaintenanceThread extends T
 	
 	private StateManager stateManager = StateManager.getInstance();
 	private HealthMonitor healthMonitor = HealthMonitor.getInstance();
-	private MqReaper mqReaper = MqReaper.getInstance();
 	
 	private long minMillis = 1000;
 	private long wakeUpMillis = 2*60*1000;
@@ -102,7 +101,14 @@ public class MaintenanceThread extends T
 				if(isTime()) {
 					stateManager.prune(workMap);
 					healthMonitor.ajudicate();
-					mqReaper.removeUnusedJdQueues(workMap);
+					try {
+						MqReaper mqReaper = MqReaper.getInstance();
+						mqReaper.removeUnusedJdQueues(workMap);
+					}
+					catch(Exception e) {
+						MqReaper.resetInstance();
+					}
+					
 				}
 			}
 			catch(Throwable t) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java?rev=1469356&r1=1469355&r2=1469356&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java Thu Apr 18 14:50:00 2013
@@ -19,124 +19,99 @@
 package org.apache.uima.ducc.orchestrator.maintenance;
 
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.util.ArrayList;
-import java.util.Hashtable;
 import java.util.Iterator;
 
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
 
-import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.uima.ducc.common.exception.DuccConfigurationException;
+import org.apache.uima.ducc.common.mq.MqHelper;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesHelper;
 import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
 
-
 public class MqReaper {
 	
 	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(MqReaper.class.getName());
+
+	private static DuccId duccId = null;
+
+	private MqHelper mqHelper = null;
 	
-	private static MqReaper mqReaper = new MqReaper();
+	private DuccPropertiesResolver duccPropertiesResolver = null;
 	
-	public static MqReaper getInstance() {
-		return mqReaper;
-	}
+	private String jd_queue_prefix = null;
+	private String jd_queue_prefix_default = "ducc.jd.queue.";
 	
-	private String ducc_broker_name = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_broker_hostname);
-	private String ducc_broker_jmx_port = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_broker_jmx_port);
-	private String ducc_broker_url = "service:jmx:rmi:///jndi/rmi://"+ducc_broker_name+":"+ducc_broker_jmx_port+"/jmxrmi";
-	private String ducc_jd_queue_prefix = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_jd_queue_prefix);
-	private String objectName = "org.apache.activemq:BrokerName="+ducc_broker_name+",Type=Broker";
+	private static MqReaper instance = null;
 	
-	private JMXServiceURL url;
-	private JMXConnector jmxc;
-	private MBeanServerConnection conn;
-	private ObjectName activeMQ;
-	private BrokerViewMBean mbean;
+	public static MqReaper getInstance() throws MalformedObjectNameException, DuccConfigurationException, IOException {
+		if(instance == null) {
+			instance = new MqReaper();
+		}
+		return instance;
+	}
 	
-	private boolean mqConnected = false;
+	public static void resetInstance() {
+		instance = null;
+	}
 	
-	public MqReaper() {
+	public MqReaper() throws MalformedObjectNameException, DuccConfigurationException, IOException {
+		resolve();
 		init();
 	}
 	
-	private void init() {
+	private void init() throws MalformedObjectNameException, DuccConfigurationException, IOException {
 		String location = "init";
-		if(ducc_broker_name == null) {
-			ducc_broker_name = "localhost";
-		}
-		if(ducc_broker_jmx_port == null) {
-			ducc_broker_jmx_port = "1099";
-		}
-		if(ducc_jd_queue_prefix == null) {
-			ducc_broker_jmx_port = "ducc.jd.queue.";
-		}
-		logger.info(location,null,DuccPropertiesResolver.ducc_broker_name+":"+ducc_broker_name);
-		logger.info(location,null,DuccPropertiesResolver.ducc_broker_jmx_port+":"+ducc_broker_jmx_port);
-		logger.info(location,null,DuccPropertiesResolver.ducc_broker_url+":"+ducc_broker_url);
-		logger.info(location,null,DuccPropertiesResolver.ducc_jd_queue_prefix+":"+ducc_jd_queue_prefix);
-		logger.info(location,null,"objectName"+":"+objectName);
-	}
-	
-	private boolean mqConnect() {
-		String location = "mqConnect";
-		if(!mqConnected) {
-			try {
-				url = new JMXServiceURL(ducc_broker_url);
-			} 
-			catch (MalformedURLException e) {
-				logger.error(location, null, e);
-				return mqConnected;
-			}
-			try {
-				jmxc = JMXConnectorFactory.connect(url);
-			} 
-			catch (IOException e) {
-				logger.error(location, null, e);
-				return mqConnected;
-			}
-			try {
-				conn = jmxc.getMBeanServerConnection();
-			} 
-			catch (IOException e) {
-				logger.error(location, null, e);
-				return mqConnected;
-			}
-			try {
-				activeMQ = new ObjectName(objectName);
-			} 
-			catch (MalformedObjectNameException e) {
-				logger.error(location, null, e);
-				return mqConnected;
-			} 
-			catch (NullPointerException e) {
-				logger.error(location, null, e);
-				return mqConnected;
-			}
-			mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, activeMQ, BrokerViewMBean.class, true);
-			mqConnected = true;
-		}
-		return mqConnected;
-	}
-	
-	private boolean isEqual(String a, String b) {
-		boolean retVal = false;
-		if(a != null) {
-			if(b != null) {
-				if(a.equals(b)) {
-					retVal = true;
-				}
+		try {
+			mqHelper = new MqHelper();
+		} catch (MalformedObjectNameException e) {
+			logger.error(location,duccId,e);
+			throw e;
+		} catch (NullPointerException e) {
+			logger.error(location,duccId,e);
+			throw e;
+		} catch (DuccConfigurationException e) {
+			logger.error(location,duccId,e);
+			throw e;
+		} catch (IOException e) {
+			logger.error(location,duccId,e);
+			throw e;
+		}
+		logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_hostname+":"+mqHelper.get_broker_hostname());
+		logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_jmx_port+":"+mqHelper.get_broker_jmx_port());
+		logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_name+":"+mqHelper.get_broker_name());
+		logger.info(location,duccId,DuccPropertiesResolver.ducc_broker_url+":"+mqHelper.get_broker_url());
+	}
+	
+	private void resolve() {
+		String location = "resolve";
+		jd_queue_prefix = getDuccProperty(DuccPropertiesResolver.ducc_jd_queue_prefix, jd_queue_prefix_default);
+		logger.info(location,duccId,DuccPropertiesResolver.ducc_jd_queue_prefix+":"+jd_queue_prefix);
+	}
+	
+	private void configure() {
+		duccPropertiesResolver = DuccPropertiesHelper.configure();
+	}
+	
+	private String getDuccProperty(String key, String defaultValue) {
+		if(duccPropertiesResolver == null) {
+			configure();
+		}
+		String value = duccPropertiesResolver.getFileProperty(key);
+		if(value == null) {
+			value = defaultValue;
+		}
+		else {
+			value = value.trim();
+			if(value.length() == 0) {
+				value = defaultValue;
 			}
 		}
-		return retVal;
+		return value;
 	}
 	
 	private boolean isStartsWith(String a, String b) {
@@ -151,77 +126,75 @@ public class MqReaper {
 		return retVal;
 	}
 	
-	public ArrayList<String> getJdQueues() {
+	private ArrayList<String> getJdQueues() {
 		String location = "getJdQueues";
-		ArrayList<String> jdQueues = new ArrayList<String>();
-		if(mqConnect()) {
-			try {
-			ObjectName[] queues = mbean.getQueues();
-				for( ObjectName queue : queues ) {
-					Hashtable<String, String> propertyTable = queue.getKeyPropertyList();
-					if(propertyTable != null) {
-						String type = propertyTable.get("Type");
-						String destination = propertyTable.get("Destination");
-						if(isEqual(type, "Queue")) {
-							if(isStartsWith(destination, ducc_jd_queue_prefix)) {
-								logger.trace(location, null, "consider:"+destination);
-								jdQueues.add(destination);
-							}
-							else {
-								logger.trace(location, null, "skip:"+destination);
-							}
-						}
-						else {
-							logger.trace(location, null, "type:"+type+" "+"destination:"+destination);
-						}
-					}
-					else {
-						logger.trace(location, null, "propertyTable:"+propertyTable);
-					}
-				}
+		ArrayList<String> qList = mqHelper.getQueueList();
+		ArrayList<String> jdQueueList = new ArrayList<String>();
+		for(String qName : qList) {
+			if(isStartsWith(qName,jd_queue_prefix)) {
+				jdQueueList.add(qName);
 			}
-			catch(Throwable t) {
-				logger.trace(location, null, t);
+			else {
+				logger.debug(location, duccId, "queue ignore:"+qName);
 			}
 		}
-		return jdQueues;
+		return jdQueueList;
 	}
 	
-	public void removeUnusedJdQueues(DuccWorkMap workMap) {
+	private String lastMessage = "";
+	private long lastTime = System.currentTimeMillis();
+	
+	private void pruningStatus(int size, int removed) {
+		String location = "pruningStatus";
+		String message = "size:"+size+" "+"removed:"+removed;
+		long time = System.currentTimeMillis();
+		if(message.equals(lastMessage)) {
+			long elapsed = time - lastTime;
+			if(elapsed > 1000*60*60) {
+				logger.info(location, duccId, message);
+				lastTime = time;
+			}
+		}
+		else {
+			logger.info(location, duccId, message);
+			lastMessage = message;
+			lastTime = time;
+		}
+	}
+	
+	public int removeUnusedJdQueues(DuccWorkMap workMap) {
 		String location = "removeUnusedJdQueues";
+		int removed = 0;
 		try {
-			ArrayList<String> queues = getJdQueues();
+			ArrayList<String> qList = getJdQueues();
 			Iterator<DuccId> iterator = workMap.getJobKeySet().iterator();
 			while( iterator.hasNext() ) {
 				DuccId jobId = iterator.next();
-				String jqKeep = ducc_jd_queue_prefix+jobId.getFriendly();
-				if(queues.remove(jqKeep)) {
-					logger.debug(location, null, "queue keep:"+jqKeep);
+				String jqKeep = jd_queue_prefix+jobId.getFriendly();
+				if(qList.remove(jqKeep)) {
+					logger.debug(location, duccId, "queue keep:"+jqKeep);
 				}
 				else {
-					logger.trace(location, null, "queue not found:"+jqKeep);
+					logger.trace(location, duccId, "queue not found:"+jqKeep);
 				}
 			}
-			for( String jqDiscard : queues ) {
-				logger.info(location, null, "queue discard:"+jqDiscard);
+			for( String qName : qList ) {
+				logger.info(location, duccId, "queue discard:"+qName);
 				try {
-					mbean.removeQueue(jqDiscard); 
+					mqHelper.removeQueue(qName);
+					removed++;
 				}
 				catch(Throwable t) {
-					logger.error(location, null, t);
-					mqConnected = false;
+					logger.error(location, duccId, t);
+					init();
 				}
 			}
+			pruningStatus(qList.size(),removed);
 		}
 		catch(Throwable t) {
-			logger.error(location, null, t);
+			logger.error(location, duccId, t);
 		}
-	}
-	
-	public static void main(String[] args) {
-		MqReaper mqr = MqReaper.getInstance();
-		DuccWorkMap workMap = new DuccWorkMap();
-		mqr.removeUnusedJdQueues(workMap);
+		return removed;
 	}
 	
 }