You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/08 16:23:23 UTC
svn commit: r961783 [2/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/ main/java/org/apache/activemq/ope...
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java Thu Jul 8 14:23:21 2010
@@ -43,7 +43,7 @@ public class JDBCStoreOrderTest extends
while(result.next()) {
long id = result.getLong(1);
Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
- LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+ LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
}
statement.close();
conn.close();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Jul 8 14:23:21 2010
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -49,10 +50,14 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.SocketProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
@@ -86,6 +91,7 @@ public class FailoverTransactionTest {
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
+ broker.setAdvisorySupport(false);
broker.addConnector(url);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
return broker;
@@ -114,7 +120,7 @@ public class FailoverTransactionTest {
}
@Test
- public void testFailoverCommitReplyLost() throws Exception {
+ public void testFailoverCommitReplyLostAMQ() throws Exception {
doTestFailoverCommitReplyLost(0);
}
@@ -222,15 +228,257 @@ public class FailoverTransactionTest {
connection.close();
}
+
+ //@Test not implemented
+ public void testFailoverSendReplyLostAMQ() throws Exception {
+ doTestFailoverSendReplyLost(0);
+ }
+
+ @Test
+ public void testFailoverSendReplyLostJdbc() throws Exception {
+ doTestFailoverSendReplyLost(1);
+ }
+
+ @Test
+ public void testFailoverSendReplyLostKahaDB() throws Exception {
+ doTestFailoverSendReplyLost(2);
+ }
+
+ public void doTestFailoverSendReplyLost(final int adapter) throws Exception {
+
+ broker = createBroker(true);
+ setPersistenceAdapter(adapter);
+
+ broker.setPlugins(new BrokerPlugin[] {
+ new BrokerPluginSupport() {
+ @Override
+ public void send(ProducerBrokerExchange producerExchange,
+ org.apache.activemq.command.Message messageSend)
+ throws Exception {
+ // so send will hang as if reply is lost
+ super.send(producerExchange, messageSend);
+ producerExchange.getConnectionContext().setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker post send...");
+ try {
+ broker.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ });
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
+ Connection connection = cf.createConnection();
+ connection.start();
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ final CountDownLatch sendDoneLatch = new CountDownLatch(1);
+ // broker will die on send reply so this will hang till restart
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("doing async send...");
+ try {
+ produceMessage(session, destination);
+ } catch (JMSException e) {
+ //assertTrue(e instanceof TransactionRolledBackException);
+ LOG.error("got send exception: ", e);
+ fail("got unexpected send exception" + e);
+ }
+ sendDoneLatch.countDown();
+ LOG.info("done async send");
+ }
+ });
+
+ // will be stopped by the plugin
+ broker.waitUntilStopped();
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ LOG.info("restarting....");
+ broker.start();
+
+ assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+
+ // new transaction
+ Message msg = consumer.receive(20000);
+ LOG.info("Received: " + msg);
+ assertNotNull("we got the message", msg);
+ assertNull("we got just one message", consumer.receive(2000));
+ consumer.close();
+ connection.close();
+
+ // verify stats
+ assertEquals("no newly queued messages", 0, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals("1 dequeue", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
+
+ // ensure no dangling messages with fresh broker etc
+ broker.stop();
+ broker.waitUntilStopped();
+
+ LOG.info("Checking for remaining/hung messages with second restart..");
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ broker.start();
+
+ // after restart, ensure no dangling messages
+ cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ connection = cf.createConnection();
+ connection.start();
+ Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session2.createConsumer(destination);
+ msg = consumer.receive(1000);
+ if (msg == null) {
+ msg = consumer.receive(5000);
+ }
+ LOG.info("Received: " + msg);
+ assertNull("no messges left dangling but got: " + msg, msg);
+ connection.close();
+ }
+
+ // not implemented.. @Test
+ public void testFailoverConnectionSendReplyLostAMQ() throws Exception {
+ doTestFailoverConnectionSendReplyLost(0);
+ }
+
+ @Test
+ public void testFailoverConnectionSendReplyLostJdbc() throws Exception {
+ doTestFailoverConnectionSendReplyLost(1);
+ }
+
+ @Test
+ public void testFailoverConnectionSendReplyLostKahaDB() throws Exception {
+ doTestFailoverConnectionSendReplyLost(2);
+ }
+
+ public void doTestFailoverConnectionSendReplyLost(final int adapter) throws Exception {
+
+ broker = createBroker(true);
+ setPersistenceAdapter(adapter);
+
+ final SocketProxy proxy = new SocketProxy();
+
+ broker.setPlugins(new BrokerPlugin[] {
+ new BrokerPluginSupport() {
+ private boolean firstSend = true;
+
+ @Override
+ public void send(ProducerBrokerExchange producerExchange,
+ org.apache.activemq.command.Message messageSend)
+ throws Exception {
+ // so send will hang as if reply is lost
+ super.send(producerExchange, messageSend);
+ if (firstSend) {
+ firstSend = false;
+
+ producerExchange.getConnectionContext().setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping connection post send...");
+ try {
+ proxy.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+ broker.start();
+
+ proxy.setTarget(new URI(url));
+ proxy.open();
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
+ Connection connection = cf.createConnection();
+ connection.start();
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ final CountDownLatch sendDoneLatch = new CountDownLatch(1);
+ // proxy connection will die on send reply so this will hang on failover reconnect till open
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("doing async send...");
+ try {
+ produceMessage(session, destination);
+ } catch (JMSException e) {
+ //assertTrue(e instanceof TransactionRolledBackException);
+ LOG.info("got send exception: ", e);
+ }
+ sendDoneLatch.countDown();
+ LOG.info("done async send");
+ }
+ });
+
+ // will be closed by the plugin
+ assertTrue("proxy was closed", proxy.waitUntilClosed(30));
+ LOG.info("restarting proxy");
+ proxy.open();
+
+ assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+
+ Message msg = consumer.receive(20000);
+ LOG.info("Received: " + msg);
+ assertNotNull("we got the message", msg);
+ assertNull("we got just one message", consumer.receive(2000));
+ consumer.close();
+ connection.close();
+
+ // verify stats, connection dup suppression means dups don't get to broker
+ assertEquals("one queued message", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+
+ // ensure no dangling messages with fresh broker etc
+ broker.stop();
+ broker.waitUntilStopped();
+
+ LOG.info("Checking for remaining/hung messages with restart..");
+ broker = createBroker(false);
+ setPersistenceAdapter(adapter);
+ broker.start();
+
+ // after restart, ensure no dangling messages
+ cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ connection = cf.createConnection();
+ connection.start();
+ Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session2.createConsumer(destination);
+ msg = consumer.receive(1000);
+ if (msg == null) {
+ msg = consumer.receive(5000);
+ }
+ LOG.info("Received: " + msg);
+ assertNull("no messges left dangling but got: " + msg, msg);
+ connection.close();
+ }
+
+
+
private void setPersistenceAdapter(int adapter) throws IOException {
switch (adapter) {
case 0:
+ broker.setPersistenceAdapter(new AMQPersistenceAdapter());
break;
case 1:
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
break;
case 2:
KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
+ // duplicate checker not updated on canceled tasks, even it
+ // it was, reovery of the audit would fail as the message is
+ // not recorded in the store and the audit may not be up to date.
+ // So if duplicate are a nono (w.r.t stats), this must be disabled
+ store.setConcurrentStoreAndDispatchQueues(false);
+ store.setMaxFailoverProducersToTrack(10);
store.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
broker.setPersistenceAdapter(store);
break;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java Thu Jul 8 14:23:21 2010
@@ -32,21 +32,29 @@ public class BitArrayBinTest extends Tes
for (int i=0; i <= dataSize; i++) {
assertTrue("not already set", !toTest.setBit(i, Boolean.TRUE));
+ assertEquals("current is max", i, toTest.getLastSetIndex());
}
+ assertEquals("last is max", dataSize, toTest.getLastSetIndex());
+
int windowOfValidData = roundWindow(dataSize, window);
int i=dataSize;
for (; i >= dataSize -windowOfValidData; i--) {
assertTrue("was already set, id=" + i, toTest.setBit(i, Boolean.TRUE));
}
+
+ assertEquals("last is still max", dataSize, toTest.getLastSetIndex());
for (; i >= 0; i--) {
assertTrue("was not already set, id=" + i, !toTest.setBit(i, Boolean.TRUE));
}
- for (int j= dataSize +1; j<(2*dataSize); j++) {
+ for (int j= dataSize +1; j<=(2*dataSize); j++) {
assertTrue("not already set: id=" + j, !toTest.setBit(j, Boolean.TRUE));
}
+
+ assertEquals("last still max*2", 2*dataSize, toTest.getLastSetIndex());
+
}
public void testSetUnsetAroundWindow() throws Exception {
@@ -87,6 +95,7 @@ public class BitArrayBinTest extends Tes
int instance = value +muliplier*BitArray.LONG_SIZE;
assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE));
assertTrue("not already set: id=" + value, !toTest.setBit(value, Boolean.TRUE));
+ assertEquals("max set correct", instance, toTest.getLastSetIndex());
}
}
}
@@ -109,6 +118,22 @@ public class BitArrayBinTest extends Tes
assertTrue("not already set: id=" + instance, !toTest.setBit(instance, Boolean.TRUE));
}
+
+ public void testLastSeq() throws Exception {
+ BitArrayBin toTest = new BitArrayBin(512);
+ assertEquals("last not set", -1, toTest.getLastSetIndex());
+
+ toTest.setBit(1, Boolean.TRUE);
+ assertEquals("last correct", 1, toTest.getLastSetIndex());
+
+ toTest.setBit(64, Boolean.TRUE);
+ assertEquals("last correct", 64, toTest.getLastSetIndex());
+
+ toTest.setBit(68, Boolean.TRUE);
+ assertEquals("last correct", 68, toTest.getLastSetIndex());
+
+ }
+
// window moves in increments of BitArray.LONG_SIZE.
// valid data window on low end can be larger than window
private int roundWindow(int dataSetEnd, int windowSize) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Thu Jul 8 14:23:21 2010
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@@ -46,6 +47,8 @@ public class SocketProxy {
private Acceptor acceptor;
private ServerSocket serverSocket;
+
+ private CountDownLatch closed = new CountDownLatch(1);
public List<Connection> connections = new LinkedList<Connection>();
@@ -87,6 +90,7 @@ public class SocketProxy {
}
acceptor = new Acceptor(serverSocket, target);
new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
+ closed = new CountDownLatch(1);
}
public URI getUrl() {
@@ -106,6 +110,11 @@ public class SocketProxy {
closeConnection(con);
}
acceptor.close();
+ closed.countDown();
+ }
+
+ public boolean waitUntilClosed(long timeoutSeconds) throws InterruptedException {
+ return closed.await(timeoutSeconds, TimeUnit.SECONDS);
}
/*
@@ -303,10 +312,12 @@ public class SocketProxy {
public void close() {
try {
socket.close();
+ closed.countDown();
goOn();
} catch (IOException ignored) {
}
}
}
+
}