You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/12/05 19:37:22 UTC

[2/2] git commit: Fixes https://issues.apache.org/jira/browse/AMQ-4923: Replicated LevelDB: Loss of broker Quorum fails to fully stop the master

Fixes https://issues.apache.org/jira/browse/AMQ-4923: Replicated LevelDB: Loss of broker Quorum fails to fully stop the master


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ed8e4eae
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ed8e4eae
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ed8e4eae

Branch: refs/heads/trunk
Commit: ed8e4eae8f79b6f88562d3008292a1927b21786d
Parents: 044c2d9
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Dec 5 13:27:59 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Dec 5 13:38:52 2013 -0500

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |   4 +
 .../org/apache/activemq/leveldb/DBManager.scala |  11 +-
 .../test/ReplicatedLevelDBBrokerTest.java       | 191 ++++++++++++++++++-
 3 files changed, 199 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ed8e4eae/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 575e313..fe6f59b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -878,6 +878,10 @@ public class BrokerService implements Service {
         }
     }
 
+    public boolean isStopped() {
+        return stopped.get();
+    }
+
     /**
      * A helper method to block the caller thread until the broker has fully started
      * @return boolean true if wait succeeded false if broker was not started or was stopped

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed8e4eae/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 722d932..cfcce78 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -35,6 +35,7 @@ import util.TimeMetric
 import scala.Some
 import org.apache.activemq.ActiveMQMessageAuditNoSync
 import org.fusesource.hawtdispatch
+import org.apache.activemq.broker.SuppressReplyException
 
 case class EntryLocator(qid:Long, seq:Long)
 case class DataLocator(store:LevelDBStore, pos:Long, len:Int) {
@@ -569,9 +570,6 @@ class DBManager(val parent:LevelDBStore) {
 
   def drainFlushes:Unit = {
     dispatchQueue.assertExecuting()
-    if( !started ) {
-      return
-    }
 
     // Some UOWs may have been canceled.
     import collection.JavaConversions._
@@ -590,7 +588,12 @@ class DBManager(val parent:LevelDBStore) {
             assert(action!=null)
           }
         }
-        Some(uow)
+        if( !started ) {
+          uow.onCompleted(new SuppressReplyException("Store stopped"))
+          None
+        } else {
+          Some(uow)
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed8e4eae/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
index a8e743f..8910981 100644
--- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
+++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBBrokerTest.java
@@ -22,6 +22,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.jms.*;
@@ -30,10 +31,17 @@ import javax.management.openmbean.CompositeData;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.io.FileUtils;
 
 import static org.junit.Assert.*;
@@ -82,6 +90,165 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
         }
     }
 
+    public interface Client{
+        public void execute(Connection connection) throws Exception;
+    }
+
+    protected Thread startFailoverClient(String name, final Client client) throws IOException, URISyntaxException {
+        String url = "failover://(tcp://localhost:"+port+")?maxReconnectDelay=500&nested.wireFormat.maxInactivityDuration=1000";
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
+        Thread rc = new Thread(name) {
+            @Override
+            public void run() {
+                Connection connection = null;
+                try {
+                    connection = factory.createConnection();
+                    client.execute(connection);
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                } finally {
+                    try {
+                        connection.close();
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        rc.start();
+        return rc;
+    }
+
+    @Test
+    @Ignore
+    public void testReplicationQuorumLoss() throws Throwable {
+
+        System.out.println("======================================");
+        System.out.println(" Start 2 ActiveMQ nodes.");
+        System.out.println("======================================");
+        startBrokerAsync(createBrokerNode("node-1", port));
+        startBrokerAsync(createBrokerNode("node-2", port));
+        BrokerService master = waitForNextMaster();
+        System.out.println("======================================");
+        System.out.println(" Start the producer and consumer");
+        System.out.println("======================================");
+
+        final AtomicBoolean stopClients = new AtomicBoolean(false);
+        final ArrayBlockingQueue<String> errors = new ArrayBlockingQueue<String>(100);
+        final AtomicLong receivedCounter = new AtomicLong();
+        final AtomicLong sentCounter = new AtomicLong();
+        Thread producer = startFailoverClient("producer", new Client() {
+            @Override
+            public void execute(Connection connection) throws Exception {
+                Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(session.createQueue("test"));
+                long actual = 0;
+                while(!stopClients.get()) {
+                    TextMessage msg = session.createTextMessage("Hello World");
+                    msg.setLongProperty("id", actual++);
+                    producer.send(msg);
+                    sentCounter.incrementAndGet();
+                }
+            }
+        });
+
+        Thread consumer = startFailoverClient("consumer", new Client() {
+            @Override
+            public void execute(Connection connection) throws Exception {
+                connection.start();
+                Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(session.createQueue("test"));
+                long expected = 0;
+                while(!stopClients.get()) {
+                    Message msg = consumer.receive(200);
+                    if( msg!=null ) {
+                        long actual = msg.getLongProperty("id");
+                        if( actual != expected ) {
+                            errors.offer("Received got unexpected msg id: "+actual+", expected: "+expected);
+                        }
+                        msg.acknowledge();
+                        expected = actual+1;
+                        receivedCounter.incrementAndGet();
+                    }
+                }
+            }
+        });
+
+        try {
+            assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS);
+            assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS);
+            assertNull(errors.poll());
+
+            System.out.println("======================================");
+            System.out.println(" Master should stop once the quorum is lost.");
+            System.out.println("======================================");
+            ArrayList<BrokerService> stopped = stopSlaves();// stopping the slaves should kill the quorum.
+            assertStopsWithin(master, 10, TimeUnit.SECONDS);
+            assertNull(errors.poll()); // clients should not see an error since they are failover clients.
+            stopped.add(master);
+
+            System.out.println("======================================");
+            System.out.println(" Restart the slave. Clients should make progress again..");
+            System.out.println("======================================");
+            startBrokersAsync(createBrokerNodes(stopped));
+            assertCounterMakesProgress(sentCounter, 10, TimeUnit.SECONDS);
+            assertCounterMakesProgress(receivedCounter, 5, TimeUnit.SECONDS);
+            assertNull(errors.poll());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        } finally {
+            // Wait for the clients to stop..
+            stopClients.set(true);
+            producer.join();
+            consumer.join();
+        }
+    }
+
+    protected void startBrokersAsync(ArrayList<BrokerService> brokers) {
+        for (BrokerService broker : brokers) {
+            startBrokerAsync(broker);
+        }
+    }
+
+    protected ArrayList<BrokerService> createBrokerNodes(ArrayList<BrokerService> brokers) throws Exception {
+        ArrayList<BrokerService> rc = new ArrayList<BrokerService>();
+        for (BrokerService b : brokers) {
+            rc.add(createBrokerNode(b.getBrokerName(), connectPort(b)));
+        }
+        return rc;
+    }
+
+    protected ArrayList<BrokerService> stopSlaves() throws Exception {
+        ArrayList<BrokerService> rc = new ArrayList<BrokerService>();
+        for (BrokerService broker : brokers) {
+            if( broker.isSlave() ) {
+                System.out.println("Stopping slave: "+broker.getBrokerName());
+                broker.stop();
+                broker.waitUntilStopped();
+                rc.add(broker);
+            }
+        }
+        brokers.removeAll(rc);
+        return rc;
+    }
+
+    protected void assertStopsWithin(final BrokerService master, int timeout, TimeUnit unit) throws InterruptedException {
+        within(timeout, unit, new Task(){
+            @Override
+            public void run() throws Exception {
+                assertTrue(master.isStopped());
+            }
+        });
+    }
+
+    protected void assertCounterMakesProgress(final AtomicLong counter, int timeout, TimeUnit unit) throws InterruptedException {
+        final long initial = counter.get();
+        within(timeout, unit, new Task(){
+            public void run() throws Exception {
+                assertTrue(initial < counter.get());
+            }
+        });
+    }
 
     public void testAMQ4837(boolean jmx) throws Throwable {
 
@@ -205,8 +372,7 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
 
     private ArrayList<String> browseMessagesViaJMS(BrokerService brokerService) throws Exception {
         ArrayList<String> rc = new ArrayList<String>();
-        TransportConnector connector = brokerService.getTransportConnectors().get(0);
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connector.getConnectUri());
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:"+ connectPort(brokerService));
         Connection connection = factory.createConnection();
         try {
             connection.start();
@@ -223,6 +389,19 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
         return rc;
     }
 
+    private int connectPort(BrokerService brokerService) throws IOException, URISyntaxException {
+        TransportConnector connector = brokerService.getTransportConnectors().get(0);
+        return connector.getConnectUri().getPort();
+    }
+
+    int port;
+    @Before
+    public void findFreePort() throws Exception {
+        ServerSocket socket = new ServerSocket(0);
+        port = socket.getLocalPort();
+        socket.close();
+    }
+
     @After
     public void stopBrokers() throws Exception {
         for (BrokerService broker : brokers) {
@@ -235,12 +414,18 @@ public class ReplicatedLevelDBBrokerTest extends ZooKeeperTestSupport {
     }
 
     private BrokerService createBrokerNode(String id) throws Exception {
+        return createBrokerNode(id, 0);
+    }
+
+    private BrokerService createBrokerNode(String id, int port) throws Exception {
         BrokerService bs = new BrokerService();
         bs.getManagementContext().setCreateConnector(false);
         brokers.add(bs);
         bs.setBrokerName(id);
         bs.setPersistenceAdapter(createStoreNode(id));
-        bs.addConnector("tcp://0.0.0.0:0");
+        TransportConnector connector = new TransportConnector();
+        connector.setUri(new URI("tcp://0.0.0.0:" + port));
+        bs.addConnector(connector);
         return bs;
     }