You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/12 11:29:56 UTC
svn commit: r1760337 -
/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Author: lquack
Date: Mon Sep 12 11:29:56 2016
New Revision: 1760337
URL: http://svn.apache.org/viewvc?rev=1760337&view=rev
Log:
QPID-7417: [Java Broker] Add supporting CommitRollbackTest#testRollbackSoak test
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=1760337&r1=1760336&r2=1760337&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Mon Sep 12 11:29:56 2016
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.test.unit.transacted;
-import org.apache.qpid.client.RejectBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -37,8 +38,19 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
* This class tests a number of commits and roll back scenarios
@@ -50,8 +62,8 @@ import java.util.concurrent.TimeUnit;
public class CommitRollbackTest extends QpidBrokerTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
- private static final int POSITIVE_TIMEOUT = 2000;
- private static final int NEGATIVE_TIMEOUT = 250;
+ private static final long POSITIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_TIMEOUT;
+ private static final long NEGATIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_SHORT_TIMEOUT;
protected AMQConnection _conn;
private Session _session;
@@ -472,7 +484,7 @@ public class CommitRollbackTest extends
_logger.info("Receiving messages");
- Message result = _consumer.receive(POSITIVE_TIMEOUT);;
+ Message result = _consumer.receive(POSITIVE_TIMEOUT);
assertNotNull("Message expected", result);
// Expect the first message
assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
@@ -502,6 +514,106 @@ public class CommitRollbackTest extends
}
}
+ public void testRollbackSoak() throws Exception
+ {
+ newConnection();
+ final int rollbackTime = 2000;
+ final int numberOfMessages = 1000;
+ final int numberOfConsumers = 2;
+ final long testTimeout = numberOfMessages * POSITIVE_TIMEOUT / numberOfConsumers;
+ sendMessage(_pubSession, _jmsQueue, numberOfMessages);
+
+ List<ListenableFuture<Void >> consumerFutures = new ArrayList<>(numberOfConsumers);
+ final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
+
+ try
+ {
+ final CountDownLatch modeFlippedLatch = new CountDownLatch(1);
+ final AtomicInteger counter = new AtomicInteger();
+ final AtomicInteger rollbackCounter = new AtomicInteger();
+ final long flipModeTime = System.currentTimeMillis() + rollbackTime;
+ final AtomicBoolean shutdown = new AtomicBoolean();
+
+ for (int i = 0; i < numberOfConsumers; ++i)
+ {
+ consumerFutures.add(threadPool.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+ final MessageConsumer consumer = session.createConsumer(_jmsQueue);
+
+ while(!shutdown.get())
+ {
+ Message m = consumer.receive(POSITIVE_TIMEOUT);
+ if (m != null)
+ {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime < flipModeTime)
+ {
+ session.rollback();
+ rollbackCounter.incrementAndGet();
+ }
+ else
+ {
+ modeFlippedLatch.countDown();
+ counter.incrementAndGet();
+ session.commit();
+ }
+ }
+
+ if (counter.get() == numberOfMessages)
+ {
+ break;
+ }
+
+ if (Thread.currentThread().isInterrupted())
+ {
+ break;
+ }
+ }
+
+ return null;
+ }
+ }));
+ }
+
+ final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
+ modeFlippedLatch.await(rollbackTime * 2, TimeUnit.MILLISECONDS);
+ try
+ {
+ combinedFuture.get(testTimeout, TimeUnit.MILLISECONDS);
+ _logger.debug("Performed {} rollbacks, consumed {}/{} messages",
+ rollbackCounter.get(),
+ counter.get(),
+ numberOfMessages);
+ }
+ catch (TimeoutException e)
+ {
+ fail(String.format(
+ "Test took more than %.1f seconds. All consumers probably starved. Performed %d rollbacks, consumed %d/%d messages",
+ testTimeout / 1000.,
+ rollbackCounter.get(),
+ counter.get(),
+ numberOfMessages));
+ }
+ finally
+ {
+ shutdown.set(true);
+ }
+ assertEquals(String.format(
+ "Unexpected number of messages received. Performed %d rollbacks, consumed %d/%d messages",
+ rollbackCounter.get(),
+ counter.get(),
+ numberOfMessages), numberOfMessages, counter.get());
+ }
+ finally
+ {
+ threadPool.shutdownNow();
+ threadPool.awaitTermination(2 * POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+ }
+ }
public void testResendUnseenMessagesAfterRollback() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org