You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/05/11 16:15:58 UTC

svn commit: r773569 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/broker/jmx/ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/kaha/impl/async/ src/m...

Author: dejanb
Date: Mon May 11 14:15:58 2009
New Revision: 773569

URL: http://svn.apache.org/viewvc?rev=773569&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2245 - broker restart

Modified:
    activemq/trunk/activemq-core/   (props changed)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java

Propchange: activemq/trunk/activemq-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon May 11 14:15:58 2009
@@ -1,4 +1,3 @@
-
 target
 foo
 activemq-data
@@ -16,3 +15,6 @@
 derbydb
 testJdbcConfig
 amqstore
+broker1
+kahadir
+data

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon May 11 14:15:58 2009
@@ -31,12 +31,13 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.Service;
-import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
 import org.apache.activemq.broker.ft.MasterConnector;
@@ -72,6 +73,7 @@
 import org.apache.activemq.proxy.ProxyConnector;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.security.SecurityContext;
+import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
@@ -178,6 +180,8 @@
     private int systemExitOnShutdownExitCode;
     private SslContext sslContext;
     
+    private boolean forceStart = false;
+    
     static {
         String localHostName = "localhost";
         try {
@@ -418,6 +422,11 @@
         return started.get();
     }
 
+    public void start(boolean force) throws Exception {
+    	forceStart = force;
+    	start();
+    }
+    
     // Service interface
     // -------------------------------------------------------------------------
     public void start() throws Exception {
@@ -456,12 +465,22 @@
             startDestinations();
 
             addShutdownHook();
-
+            
+            getBroker().start();
+            
             if (isUseJmx()) {
-                getManagementContext().start();
+            	getManagementContext().start();
+                ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
+                managedBroker.setContextBroker(broker);
+                adminView = new BrokerView(this, managedBroker);
+                MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+                if (mbeanServer != null) {
+                    ObjectName objectName = getBrokerObjectName();
+                    mbeanServer.registerMBean(adminView, objectName);
+                    registeredMBeanNames.add(objectName);
+                }
             }
             
-            getBroker().start();
             BrokerRegistry.getInstance().bind(getBrokerName(), this);
             
            // see if there is a MasterBroker service and if so, configure
@@ -532,6 +551,7 @@
                     }
                 }
             }
+            registeredMBeanNames.clear();
             stopper.stop(getManagementContext());
         }
         // Clear SelectorParser cache to free memory
@@ -1585,30 +1605,27 @@
 
         // Add a filter that will stop access to the broker once stopped
         broker = new MutableBrokerFilter(broker) {
-            public void stop() throws Exception {
-                Broker old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
+        	Broker old;
+            
+        	public void stop() throws Exception {
+                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
                     // Just ignore additional stop actions.
                     public void stop() throws Exception {
                     }
+                    
                 });
                 old.stop();
             }
+        	
+        	public void start() throws Exception {
+        		if (forceStart && old != null) {
+        			this.next.set(old);
+        		}
+        		getNext().start();
+        	}
+            
         };
 
-//        RegionBroker rBroker = (RegionBroker)regionBroker;
-
-        if (isUseJmx()) {
-            ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
-            managedBroker.setContextBroker(broker);
-            adminView = new BrokerView(this, managedBroker);
-            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
-            if (mbeanServer != null) {
-                ObjectName objectName = getBrokerObjectName();
-                mbeanServer.registerMBean(adminView, objectName);
-                registeredMBeanNames.add(objectName);
-            }
-        }
-
         return broker;
 
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Mon May 11 14:15:58 2009
@@ -20,6 +20,7 @@
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -62,6 +63,7 @@
     private AtomicBoolean started = new AtomicBoolean(false);
     private JMXConnectorServer connectorServer;
     private ObjectName namingServiceObjectName;
+    private Registry registry;
 
     public ManagementContext() {
         this(null);
@@ -121,6 +123,7 @@
                     MBeanServerFactory.releaseMBeanServer(beanServer);
                 }
             }
+            beanServer = null;
         }
     }
 
@@ -361,7 +364,9 @@
     private void createConnector(MBeanServer mbeanServer) throws MalformedObjectNameException, MalformedURLException, IOException {
         // Create the NamingService, needed by JSR 160
         try {
-            LocateRegistry.createRegistry(connectorPort);
+        	if (registry == null) {
+        		registry = LocateRegistry.createRegistry(connectorPort);
+        	}
             namingServiceObjectName = ObjectName.getInstance("naming:type=rmiregistry");
             // Do not use the createMBean as the mx4j jar may not be in the
             // same class loader than the server

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon May 11 14:15:58 2009
@@ -184,6 +184,11 @@
         ServiceStopper ss = new ServiceStopper();
         doStop(ss);
         ss.throwFirstException();
+        // clear the state
+        clientIdSet.clear();
+        connections.clear();
+        destinations.clear();
+        brokerInfos.clear();
     }
 
     public PolicyMap getDestinationPolicy() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon May 11 14:15:58 2009
@@ -90,7 +90,7 @@
     protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
 
     protected DataFileAppender appender;
-    protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+    protected DataFileAccessorPool accessorPool;
 
     protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
     protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
@@ -120,6 +120,7 @@
         preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
         lock();
 
+        accessorPool = new DataFileAccessorPool(this);
         ByteSequence sequence = controlFile.load();
         if (sequence != null && sequence.getLength() > 0) {
             unmarshallState(sequence);
@@ -197,7 +198,7 @@
 
     public void lock() throws IOException {
         synchronized (this) {
-            if (controlFile == null) {
+            if (controlFile == null || controlFile.isDisposed()) {
                 IOHelper.mkdirs(directory);
                 controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java Mon May 11 14:15:58 2009
@@ -179,4 +179,8 @@
         }
     }
 
+	public boolean isDisposed() {
+		return disposed;
+	}
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon May 11 14:15:58 2009
@@ -203,7 +203,7 @@
                     asyncDataManager.lock();
                     break;
                 } catch (IOException e) {
-                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
+                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
                     try {
                         Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
                     } catch (InterruptedException e1) {
@@ -325,6 +325,7 @@
         topics.clear();
         IOException firstException = null;
         referenceStoreAdapter.stop();
+        referenceStoreAdapter = null;
         try {
             LOG.debug("Journal close");
             asyncDataManager.close();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java?rev=773569&r1=773568&r2=773569&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java Mon May 11 14:15:58 2009
@@ -19,8 +19,16 @@
 import java.net.URI;
 import java.util.List;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
 import junit.framework.TestCase;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
@@ -52,6 +60,39 @@
         assertEquals(new ActiveMQTopic("include.test.bar"), dynamicallyIncludedDestinations.get(1));
 
     }
+    
+    public void testBrokerRestartFails() throws Exception {
+    	brokerService.stop();
+    	brokerService.waitUntilStopped();
+    	
+    	try {
+    		brokerService.start();
+    	} catch (Exception e) {
+    		return;
+    	}
+    	fail("Error broker should have prevented us from starting it again");
+    }
+    
+    public void testForceBrokerRestart() throws Exception {
+    	brokerService.stop();
+    	brokerService.waitUntilStopped();
+    	
+    	brokerService.start(true); // force restart
+    	brokerService.waitUntilStarted();
+    	
+    	//send and receive a message from a restarted broker
+    	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
+    	Connection conn = factory.createConnection();
+    	Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    	conn.start();
+    	Destination dest = new ActiveMQQueue("test");
+    	MessageProducer producer = sess.createProducer(dest);
+    	MessageConsumer consumer = sess.createConsumer(dest);
+    	producer.send(sess.createTextMessage("test"));
+    	TextMessage msg = (TextMessage)consumer.receive(1000);
+    	assertEquals("test", msg.getText());
+    }
+
 
     protected void setUp() throws Exception {
         brokerService = createBroker();