You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Leen Toelen <to...@gmail.com> on 2010/06/14 09:43:50 UTC

ActiveMQ max queue size delayer

Hi,

I have written a little 'delayer' bean that introspects a queue size via
JMX. Is is quite useful in my environment to put this logic on the producer
side without being blocked by producer flow control. Does anyone know how to
do this without falling back to JMX. I haven't found a way to do this using
the activemq api.

Regards,
Leen

import java.io.IOException;
import java.util.Hashtable;

import javax.management.JMException;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.QueueViewMBean;

public class ActiveMqQueueDelayer {
public static final String METHOD_NAME = "calculateDelay";
private long maxSize = 10;
private final JMXServiceURL jmxServiceURL;
private String brokerName;
private final QueueViewMBean qv;
private String type = "Queue";
private String destination = "";
private String domain = "org.apache.activemq";
private long delay;

public ActiveMqQueueDelayer(String serviceURL, String brokerName,
String destination) throws IOException, JMException {
this(new JMXServiceURL(serviceURL), brokerName, destination);
}

public ActiveMqQueueDelayer(JMXServiceURL url, String brokerName,
String destination) throws IOException, JMException {

this.jmxServiceURL = url;
this.brokerName = brokerName;
this.destination = destination;
this.qv = getQueueViewMBean();
}

private QueueViewMBean getQueueViewMBean() throws IOException, JMException {
JMXConnector c = JMXConnectorFactory.connect(jmxServiceURL);
MBeanServerConnection mcon = c.getMBeanServerConnection();

Hashtable<String, String> props = new Hashtable<String, String>();
props.put("BrokerName", brokerName);
props.put("Type", type);
props.put("Destination", destination);
ObjectName objectName = new ObjectName(domain, props);

QueueViewMBean qv = JMX.newMBeanProxy(mcon, objectName,
QueueViewMBean.class);
return qv;
}

public long getQueueSize() {
if (qv != null) {
long size = qv.getQueueSize();
return size;
} else {
throw new IllegalStateException("QueueViewMBean is null");
}
}

public boolean isMaxSizeReached() throws IllegalStateException {
long size = getQueueSize();

if (size > maxSize) {
return true;
} else {
return false;
}
}

public long calculateDelay() {
if (isMaxSizeReached()) {
return delay;
} else {
return 0;
}
}

public long getMaxSize() {
return maxSize;
}

public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
}

public JMXServiceURL getUrl() {
return jmxServiceURL;
}

public String getBrokerName() {
return brokerName;
}

public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getDestination() {
return destination;
}

public void setDestination(String destination) {
this.destination = destination;
}

public String getDomain() {
return domain;
}

public void setDomain(String domain) {
this.domain = domain;
}

public long getDelay() {
return delay;
}

public void setDelay(long delay) {
this.delay = delay;
}

}

Re: ActiveMQ max queue size delayer

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Jun 14, 2010 at 9:43 AM, Leen Toelen <to...@gmail.com> wrote:
> Hi,
>
> I have written a little 'delayer' bean that introspects a queue size via
> JMX. Is is quite useful in my environment to put this logic on the producer
> side without being blocked by producer flow control. Does anyone know how to
> do this without falling back to JMX. I haven't found a way to do this using
> the activemq api.
>

AMQ has advisory messages which can return status about the broker.
But maybe there is a native AMQ API for that as well.
You gotta ask at the AMQ user forum.


> Regards,
> Leen
>
> import java.io.IOException;
> import java.util.Hashtable;
>
> import javax.management.JMException;
> import javax.management.JMX;
> import javax.management.MBeanServerConnection;
> import javax.management.ObjectName;
> import javax.management.remote.JMXConnector;
> import javax.management.remote.JMXConnectorFactory;
> import javax.management.remote.JMXServiceURL;
>
> import org.apache.activemq.broker.jmx.QueueViewMBean;
>
> public class ActiveMqQueueDelayer {
> public static final String METHOD_NAME = "calculateDelay";
> private long maxSize = 10;
> private final JMXServiceURL jmxServiceURL;
> private String brokerName;
> private final QueueViewMBean qv;
> private String type = "Queue";
> private String destination = "";
> private String domain = "org.apache.activemq";
> private long delay;
>
> public ActiveMqQueueDelayer(String serviceURL, String brokerName,
> String destination) throws IOException, JMException {
> this(new JMXServiceURL(serviceURL), brokerName, destination);
> }
>
> public ActiveMqQueueDelayer(JMXServiceURL url, String brokerName,
> String destination) throws IOException, JMException {
>
> this.jmxServiceURL = url;
> this.brokerName = brokerName;
> this.destination = destination;
> this.qv = getQueueViewMBean();
> }
>
> private QueueViewMBean getQueueViewMBean() throws IOException, JMException {
> JMXConnector c = JMXConnectorFactory.connect(jmxServiceURL);
> MBeanServerConnection mcon = c.getMBeanServerConnection();
>
> Hashtable<String, String> props = new Hashtable<String, String>();
> props.put("BrokerName", brokerName);
> props.put("Type", type);
> props.put("Destination", destination);
> ObjectName objectName = new ObjectName(domain, props);
>
> QueueViewMBean qv = JMX.newMBeanProxy(mcon, objectName,
> QueueViewMBean.class);
> return qv;
> }
>
> public long getQueueSize() {
> if (qv != null) {
> long size = qv.getQueueSize();
> return size;
> } else {
> throw new IllegalStateException("QueueViewMBean is null");
> }
> }
>
> public boolean isMaxSizeReached() throws IllegalStateException {
> long size = getQueueSize();
>
> if (size > maxSize) {
> return true;
> } else {
> return false;
> }
> }
>
> public long calculateDelay() {
> if (isMaxSizeReached()) {
> return delay;
> } else {
> return 0;
> }
> }
>
> public long getMaxSize() {
> return maxSize;
> }
>
> public void setMaxSize(long maxSize) {
> this.maxSize = maxSize;
> }
>
> public JMXServiceURL getUrl() {
> return jmxServiceURL;
> }
>
> public String getBrokerName() {
> return brokerName;
> }
>
> public void setBrokerName(String brokerName) {
> this.brokerName = brokerName;
> }
>
> public String getType() {
> return type;
> }
>
> public void setType(String type) {
> this.type = type;
> }
>
> public String getDestination() {
> return destination;
> }
>
> public void setDestination(String destination) {
> this.destination = destination;
> }
>
> public String getDomain() {
> return domain;
> }
>
> public void setDomain(String domain) {
> this.domain = domain;
> }
>
> public long getDelay() {
> return delay;
> }
>
> public void setDelay(long delay) {
> this.delay = delay;
> }
>
> }
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus