You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/30 13:17:15 UTC
svn commit: r469135 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
advisory/ broker/ broker/util/
Author: jstrachan
Date: Mon Oct 30 04:17:14 2006
New Revision: 469135
URL: http://svn.apache.org/viewvc?view=rev&rev=469135
Log:
added support for a simple message based command agent so that you can send management commands to the broker over JMS
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?view=diff&rev=469135&r1=469134&r2=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Mon Oct 30 04:17:14 2006
@@ -20,6 +20,8 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
+import javax.jms.Destination;
+
public class AdvisorySupport {
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
@@ -38,6 +40,8 @@
public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
+ public static final String AGENT_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Agent.";
+
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
@@ -167,4 +171,7 @@
}
}
+ public static Destination getAgentDestination(String brokerName) {
+ return new ActiveMQTopic(AGENT_TOPIC_PREFIX + brokerName);
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=469135&r1=469134&r2=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Oct 30 04:17:14 2006
@@ -51,6 +51,7 @@
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.util.CommandAgent;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
@@ -118,7 +119,7 @@
private List proxyConnectors = new CopyOnWriteArrayList();
private List registeredMBeanNames = new CopyOnWriteArrayList();
private List jmsConnectors = new CopyOnWriteArrayList();
- private Service[] services;
+ private Service[] services = new Service[] { new CommandAgent() };
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java?view=auto&rev=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java Mon Oct 30 04:17:14 2006
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+
+/**
+ * An agent which listens to commands on a JMS destination
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: $
+ */
+public class CommandAgent implements Service, BrokerServiceAware {
+ private static final Log log = LogFactory.getLog(CommandAgent.class);
+
+ private String brokerUrl = "vm://localhost";
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Destination commandDestination;
+ private CommandMessageListener listener;
+ private Session session;
+ private MessageConsumer consumer;
+ private String brokerName = "default";
+
+ public void start() throws Exception {
+ session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+ listener = new CommandMessageListener(session);
+ Destination destination = getCommandDestination();
+ if (log.isDebugEnabled()) {
+ log.debug("Agent subscribing to control destination: " + destination);
+ }
+ consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+ }
+
+ public void stop() throws Exception {
+ consumer.close();
+ consumer = null;
+ session.close();
+ session = null;
+ connection.close();
+ connection = null;
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ String name = brokerService.getBrokerName();
+ if (name != null) {
+ brokerName = name;
+ }
+ }
+
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+
+ public void setBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ if (connectionFactory == null) {
+ connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+ }
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public Connection getConnection() throws JMSException {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ public Destination getCommandDestination() {
+ if (commandDestination == null) {
+ commandDestination = createCommandDestination();
+ }
+ return commandDestination;
+ }
+
+ public void setCommandDestination(Destination commandDestination) {
+ this.commandDestination = commandDestination;
+ }
+
+ protected Connection createConnection() throws JMSException {
+ return getConnectionFactory().createConnection();
+ }
+
+
+ protected Destination createCommandDestination() {
+ return AdvisorySupport.getAgentDestination(brokerName);
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java?view=auto&rev=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java Mon Oct 30 04:17:14 2006
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+/**
+ * Represents a processor of text based commands
+ *
+ * @version $Revision: $
+ */
+public interface CommandHandler {
+ void processCommand(TextMessage request, TextMessage response) throws Exception;
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java?view=auto&rev=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java Mon Oct 30 04:17:14 2006
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+
+/**
+ * @version $Revision: $
+ */
+public class CommandMessageListener implements MessageListener {
+ private static final Log log = LogFactory.getLog(CommandMessageListener.class);
+
+ private Session session;
+ private MessageProducer producer;
+ private CommandHandler handler;
+
+ public CommandMessageListener(Session session) {
+ this.session = session;
+ }
+
+ public void onMessage(Message message) {
+ if (message instanceof TextMessage) {
+ TextMessage request = (TextMessage) message;
+ try {
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo == null) {
+ log.warn("Ignored message as no JMSReplyTo set: " + message);
+ return;
+ }
+ Message response = processCommand(request);
+ addReplyHeaders(request, response);
+
+ }
+ catch (Exception e) {
+ log.error("Failed to process message due to: " + e + ". Message: " + message, e);
+ }
+ }
+ else {
+ log.warn("Ignoring invalid message: " + message);
+ }
+ }
+
+ protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
+ String correlationID = request.getJMSCorrelationID();
+ if (correlationID != null) {
+ response.setJMSCorrelationID(correlationID);
+ }
+ }
+
+ protected Message processCommand(TextMessage request) throws Exception {
+ TextMessage response = session.createTextMessage();
+ getHandler().processCommand(request, response);
+ return response;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public MessageProducer getProducer() throws JMSException {
+ if (producer == null) {
+ producer = getSession().createProducer(null);
+ }
+ return producer;
+ }
+
+ public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
+ if (handler == null) {
+ handler = createHandler();
+ }
+ return handler;
+ }
+
+ private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
+ FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
+ return (CommandHandler) factoryFinder.newInstance("agent");
+ }
+}