You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/08 16:23:23 UTC

svn commit: r961783 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/ope...

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java Thu Jul  8 14:23:21 2010
@@ -43,7 +43,7 @@ public class JDBCStoreOrderTest extends 
         while(result.next()) {
             long id = result.getLong(1);
             Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
-            LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+            LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
         }
         statement.close();
         conn.close();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Jul  8 14:23:21 2010
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -49,10 +50,14 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.SocketProxy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.After;
@@ -86,6 +91,7 @@ public class FailoverTransactionTest {
 	public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {   
 	    broker = new BrokerService();
 	    broker.setUseJmx(false);
+	    broker.setAdvisorySupport(false);
 	    broker.addConnector(url);
 	    broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
 	    return broker;
@@ -114,7 +120,7 @@ public class FailoverTransactionTest {
 	}
 	
     @Test
-    public void testFailoverCommitReplyLost() throws Exception {
+    public void testFailoverCommitReplyLostAMQ() throws Exception {
         doTestFailoverCommitReplyLost(0);
     }  
     
@@ -222,15 +228,257 @@ public class FailoverTransactionTest {
         connection.close();
     }
 
+    
+    //@Test not implemented
+    public void testFailoverSendReplyLostAMQ() throws Exception {
+        doTestFailoverSendReplyLost(0);
+    }  
+    
+    @Test
+    public void testFailoverSendReplyLostJdbc() throws Exception {
+        doTestFailoverSendReplyLost(1);
+    }
+    
+    @Test
+    public void testFailoverSendReplyLostKahaDB() throws Exception {
+        doTestFailoverSendReplyLost(2);
+    }
+    
+    public void doTestFailoverSendReplyLost(final int adapter) throws Exception {
+        
+        broker = createBroker(true);
+        setPersistenceAdapter(adapter);
+            
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    @Override
+                    public void send(ProducerBrokerExchange producerExchange,
+                            org.apache.activemq.command.Message messageSend)
+                            throws Exception {
+                        // so send will hang as if reply is lost
+                        super.send(producerExchange, messageSend);
+                        producerExchange.getConnectionContext().setDontSendReponse(true);
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                            public void run() {
+                                LOG.info("Stopping broker post send...");
+                                try {
+                                    broker.stop();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                    }
+                }
+        });
+        broker.start();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        final CountDownLatch sendDoneLatch = new CountDownLatch(1);
+        // broker will die on send reply so this will hang till restart
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                LOG.info("doing async send...");
+                try {
+                    produceMessage(session, destination);
+                } catch (JMSException e) {
+                    //assertTrue(e instanceof TransactionRolledBackException);
+                    LOG.error("got send exception: ", e);
+                    fail("got unexpected send exception" + e);
+                }
+                sendDoneLatch.countDown();
+                LOG.info("done async send");
+            }
+        });
+       
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        LOG.info("restarting....");
+        broker.start();
+
+        assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+        
+        // new transaction
+        Message msg = consumer.receive(20000);
+        LOG.info("Received: " + msg);
+        assertNotNull("we got the message", msg);
+        assertNull("we got just one message", consumer.receive(2000));
+        consumer.close();
+        connection.close();
+        
+        // verify stats
+        assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
+        
+        // ensure no dangling messages with fresh broker etc
+        broker.stop();
+        broker.waitUntilStopped();
+        
+        LOG.info("Checking for remaining/hung messages with second restart..");
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        broker.start();
+        
+        // after restart, ensure no dangling messages
+        cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        connection = cf.createConnection();
+        connection.start();
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session2.createConsumer(destination);
+        msg = consumer.receive(1000);
+        if (msg == null) {
+            msg = consumer.receive(5000);
+        }
+        LOG.info("Received: " + msg);
+        assertNull("no messges left dangling but got: " + msg, msg);
+        connection.close();
+    }
+
+    // not implemented.. @Test
+    public void testFailoverConnectionSendReplyLostAMQ() throws Exception {
+        doTestFailoverConnectionSendReplyLost(0);
+    }  
+    
+    @Test
+    public void testFailoverConnectionSendReplyLostJdbc() throws Exception {
+        doTestFailoverConnectionSendReplyLost(1);
+    }
+    
+    @Test
+    public void testFailoverConnectionSendReplyLostKahaDB() throws Exception {
+        doTestFailoverConnectionSendReplyLost(2);
+    }
+    
+    public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception {
+        
+        broker = createBroker(true);
+        setPersistenceAdapter(adapter);
+        
+        final SocketProxy proxy = new SocketProxy();
+
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    private boolean firstSend = true;
+
+                    @Override
+                    public void send(ProducerBrokerExchange producerExchange,
+                            org.apache.activemq.command.Message messageSend)
+                            throws Exception {
+                        // so send will hang as if reply is lost
+                        super.send(producerExchange, messageSend);
+                        if (firstSend) {
+                            firstSend = false;
+                        
+                            producerExchange.getConnectionContext().setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                                public void run() {
+                                    LOG.info("Stopping connection post send...");
+                                    try {
+                                        proxy.close();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }   
+                            });
+                        }
+                    }
+                }
+        });
+        broker.start();
+        
+        proxy.setTarget(new URI(url));
+        proxy.open();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        final CountDownLatch sendDoneLatch = new CountDownLatch(1);
+        // proxy connection will die on send reply so this will hang on failover reconnect till open
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                LOG.info("doing async send...");
+                try {
+                    produceMessage(session, destination);
+                } catch (JMSException e) {
+                    //assertTrue(e instanceof TransactionRolledBackException);
+                    LOG.info("got send exception: ", e);
+                }
+                sendDoneLatch.countDown();
+                LOG.info("done async send");
+            }
+        });
+       
+        // will be closed by the plugin
+        assertTrue("proxy was closed", proxy.waitUntilClosed(30));
+        LOG.info("restarting proxy");
+        proxy.open();
+
+        assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+        
+        Message msg = consumer.receive(20000);
+        LOG.info("Received: " + msg);
+        assertNotNull("we got the message", msg);
+        assertNull("we got just one message", consumer.receive(2000));
+        consumer.close();
+        connection.close();
+        
+        // verify stats, connection dup suppression means dups don't get to broker
+        assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        
+        // ensure no dangling messages with fresh broker etc
+        broker.stop();
+        broker.waitUntilStopped();
+        
+        LOG.info("Checking for remaining/hung messages with restart..");
+        broker = createBroker(false);
+        setPersistenceAdapter(adapter);
+        broker.start();
+        
+        // after restart, ensure no dangling messages
+        cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        connection = cf.createConnection();
+        connection.start();
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session2.createConsumer(destination);
+        msg = consumer.receive(1000);
+        if (msg == null) {
+            msg = consumer.receive(5000);
+        }
+        LOG.info("Received: " + msg);
+        assertNull("no messges left dangling but got: " + msg, msg);
+        connection.close();
+    }
+    
+    
+    
     private void setPersistenceAdapter(int adapter) throws IOException {
         switch (adapter) {
         case 0:
+            broker.setPersistenceAdapter(new AMQPersistenceAdapter());
             break;
         case 1:
             broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
             break;
         case 2:
             KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
+            // duplicate checker not updated on canceled tasks, even it
+            // it was, reovery of the audit would fail as the message is
+            // not recorded in the store and the audit may not be up to date.
+            // So if duplicate are a nono (w.r.t stats), this must be disabled
+            store.setConcurrentStoreAndDispatchQueues(false);
+            store.setMaxFailoverProducersToTrack(10);
             store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
             broker.setPersistenceAdapter(store);
             break;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java Thu Jul  8 14:23:21 2010
@@ -32,21 +32,29 @@ public class BitArrayBinTest extends Tes
         
         for (int i=0; i <= dataSize; i++) {
             assertTrue("not already set", !toTest.setBit(i, Boolean.TRUE));
+            assertEquals("current is max", i, toTest.getLastSetIndex());
         }
 
+        assertEquals("last is max", dataSize, toTest.getLastSetIndex());
+        
         int windowOfValidData = roundWindow(dataSize, window);
         int i=dataSize;
         for (; i >= dataSize -windowOfValidData; i--) {
             assertTrue("was already set, id=" + i, toTest.setBit(i, Boolean.TRUE));
         }
+
+        assertEquals("last is still max", dataSize, toTest.getLastSetIndex());
         
         for (; i >= 0; i--) {
             assertTrue("was not already set, id=" + i, !toTest.setBit(i, Boolean.TRUE));
         }
         
-        for (int j= dataSize +1; j<(2*dataSize); j++) {
+        for (int j= dataSize +1; j<=(2*dataSize); j++) {
             assertTrue("not already set: id=" + j, !toTest.setBit(j, Boolean.TRUE));
         }
+        
+        assertEquals("last still max*2", 2*dataSize, toTest.getLastSetIndex());
+
     }
     
     public void testSetUnsetAroundWindow() throws Exception {
@@ -87,6 +95,7 @@ public class BitArrayBinTest extends Tes
                 int instance = value +muliplier*BitArray.LONG_SIZE;
                 assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE));
                 assertTrue("not already set: id=" + value, !toTest.setBit(value, Boolean.TRUE));
+                assertEquals("max set correct", instance, toTest.getLastSetIndex());
             }
         }
     }
@@ -109,6 +118,22 @@ public class BitArrayBinTest extends Tes
         assertTrue("not already set: id=" + instance,  !toTest.setBit(instance, Boolean.TRUE));
     }
     
+    
+   public void testLastSeq() throws Exception {
+       BitArrayBin toTest = new BitArrayBin(512);
+       assertEquals("last not set", -1, toTest.getLastSetIndex());
+       
+       toTest.setBit(1, Boolean.TRUE);
+       assertEquals("last correct", 1, toTest.getLastSetIndex());
+       
+       toTest.setBit(64, Boolean.TRUE);
+       assertEquals("last correct", 64, toTest.getLastSetIndex());
+       
+       toTest.setBit(68, Boolean.TRUE);
+       assertEquals("last correct", 68, toTest.getLastSetIndex());
+       
+   }
+    
     // window moves in increments of BitArray.LONG_SIZE.
     // valid data window on low end can be larger than window
     private int roundWindow(int dataSetEnd, int windowSize) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Thu Jul  8 14:23:21 2010
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +47,8 @@ public class SocketProxy {
 
     private Acceptor acceptor;
     private ServerSocket serverSocket;
+    
+    private CountDownLatch closed = new CountDownLatch(1);
 
     public List<Connection> connections = new LinkedList<Connection>();
 
@@ -87,6 +90,7 @@ public class SocketProxy {
         }
         acceptor = new Acceptor(serverSocket, target);
         new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
+        closed = new CountDownLatch(1);
     }
 
     public URI getUrl() {
@@ -106,6 +110,11 @@ public class SocketProxy {
             closeConnection(con);
         }
         acceptor.close();
+        closed.countDown();
+    }
+
+    public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
+        return closed.await(timeoutSeconds, TimeUnit.SECONDS);
     }
 
     /*
@@ -303,10 +312,12 @@ public class SocketProxy {
         public void close() {
             try {
                 socket.close();
+                closed.countDown();
                 goOn();
             } catch (IOException ignored) {
             }
         }
     }
+
 }