You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/30 13:17:15 UTC

svn commit: r469135 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/ broker/ broker/util/

Author: jstrachan
Date: Mon Oct 30 04:17:14 2006
New Revision: 469135

URL: http://svn.apache.org/viewvc?view=rev&rev=469135
Log:
added support for a simple message based command agent so that you can send management commands to the broker over JMS

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?view=diff&rev=469135&r1=469134&r2=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Mon Oct 30 04:17:14 2006
@@ -20,6 +20,8 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 
+import javax.jms.Destination;
+
 public class AdvisorySupport {
     
     public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
@@ -38,6 +40,8 @@
     public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Expired.Queue.";
     public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Topic.";
     public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"NoConsumer.Queue.";
+    public static final String AGENT_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX+"Agent.";
+
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC+","+TEMP_TOPIC_ADVISORY_TOPIC);
 
@@ -167,4 +171,7 @@
         }
     }
 
+    public static Destination getAgentDestination(String brokerName) {
+        return new ActiveMQTopic(AGENT_TOPIC_PREFIX + brokerName);
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=469135&r1=469134&r2=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Oct 30 04:17:14 2006
@@ -51,6 +51,7 @@
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.util.CommandAgent;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.kaha.Store;
@@ -118,7 +119,7 @@
     private List proxyConnectors = new CopyOnWriteArrayList();
     private List registeredMBeanNames = new CopyOnWriteArrayList();
     private List jmsConnectors = new CopyOnWriteArrayList();
-    private Service[] services;
+    private Service[] services = new Service[] { new CommandAgent() };
     private MasterConnector masterConnector;
     private String masterConnectorURI;
     private transient Thread shutdownHook;

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java?view=auto&rev=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandAgent.java Mon Oct 30 04:17:14 2006
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+
+/**
+ * An agent which listens to commands on a JMS destination
+ *
+ * @org.apache.xbean.XBean
+ *
+ * @version $Revision: $
+ */
+public class CommandAgent implements Service, BrokerServiceAware {
+    private static final Log log = LogFactory.getLog(CommandAgent.class);
+
+    private String brokerUrl = "vm://localhost";
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Destination commandDestination;
+    private CommandMessageListener listener;
+    private Session session;
+    private MessageConsumer consumer;
+    private String brokerName = "default";
+
+    public void start() throws Exception {
+        session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+        listener = new CommandMessageListener(session);
+        Destination destination = getCommandDestination();
+        if (log.isDebugEnabled()) {
+            log.debug("Agent subscribing to control destination: " + destination);
+        }
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(listener);
+    }
+
+    public void stop() throws Exception {
+        consumer.close();
+        consumer = null;
+        session.close();
+        session = null;
+        connection.close();
+        connection = null;
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        String name = brokerService.getBrokerName();
+        if (name != null) {
+            brokerName = name;
+        }
+    }
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        if (connectionFactory == null) {
+            connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+        }
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public Connection getConnection() throws JMSException {
+        if (connection == null) {
+            connection = createConnection();
+        }
+        return connection;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public Destination getCommandDestination() {
+        if (commandDestination == null) {
+            commandDestination = createCommandDestination();
+        }
+        return commandDestination;
+    }
+
+    public void setCommandDestination(Destination commandDestination) {
+        this.commandDestination = commandDestination;
+    }
+
+    protected Connection createConnection() throws JMSException {
+        return getConnectionFactory().createConnection();
+    }
+
+
+    protected Destination createCommandDestination() {
+        return AdvisorySupport.getAgentDestination(brokerName); 
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java?view=auto&rev=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandHandler.java Mon Oct 30 04:17:14 2006
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+
+/**
+ * Represents a processor of text based commands
+ *
+ * @version $Revision: $
+ */
+public interface CommandHandler {
+    void processCommand(TextMessage request, TextMessage response) throws Exception;
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java?view=auto&rev=469135
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/CommandMessageListener.java Mon Oct 30 04:17:14 2006
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import java.io.IOException;
+
+/**
+ * @version $Revision: $
+ */
+public class CommandMessageListener implements MessageListener {
+    private static final Log log = LogFactory.getLog(CommandMessageListener.class);
+
+    private Session session;
+    private MessageProducer producer;
+    private CommandHandler handler;
+
+    public CommandMessageListener(Session session) {
+        this.session = session;
+    }
+
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage request = (TextMessage) message;
+            try {
+                Destination replyTo = message.getJMSReplyTo();
+                if (replyTo == null) {
+                    log.warn("Ignored message as no JMSReplyTo set: " + message);
+                    return;
+                }
+                Message response = processCommand(request);
+                addReplyHeaders(request, response);
+
+            }
+            catch (Exception e) {
+                log.error("Failed to process message due to: " + e + ". Message: " + message, e);
+            }
+        }
+        else {
+            log.warn("Ignoring invalid message: " + message);
+        }
+    }
+
+    protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
+        String correlationID = request.getJMSCorrelationID();
+        if (correlationID != null) {
+            response.setJMSCorrelationID(correlationID);
+        }
+    }
+
+    protected Message processCommand(TextMessage request) throws Exception {
+        TextMessage response = session.createTextMessage();
+        getHandler().processCommand(request, response);
+        return response;
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    public MessageProducer getProducer() throws JMSException {
+        if (producer == null) {
+            producer = getSession().createProducer(null);
+        }
+        return producer;
+    }
+
+    public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
+        if (handler == null) {
+            handler = createHandler();
+        }
+        return handler;
+    }
+
+    private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
+        FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
+        return (CommandHandler) factoryFinder.newInstance("agent");
+    }
+}