You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/12 11:29:56 UTC

svn commit: r1760337 - /qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Author: lquack
Date: Mon Sep 12 11:29:56 2016
New Revision: 1760337

URL: http://svn.apache.org/viewvc?rev=1760337&view=rev
Log:
QPID-7417: [Java Broker] Add supporting CommitRollbackTest#testRollbackSoak test

Modified:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=1760337&r1=1760336&r2=1760337&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Mon Sep 12 11:29:56 2016
@@ -20,14 +20,15 @@
  */
 package org.apache.qpid.test.unit.transacted;
 
-import org.apache.qpid.client.RejectBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -37,8 +38,19 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
  * This class tests a number of commits and roll back scenarios
@@ -50,8 +62,8 @@ import java.util.concurrent.TimeUnit;
 public class CommitRollbackTest extends QpidBrokerTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
-    private static final int POSITIVE_TIMEOUT = 2000;
-    private static final int NEGATIVE_TIMEOUT = 250;
+    private static final long POSITIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_TIMEOUT;
+    private static final long NEGATIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_SHORT_TIMEOUT;
 
     protected AMQConnection _conn;
     private Session _session;
@@ -472,7 +484,7 @@ public class CommitRollbackTest extends
 
         _logger.info("Receiving messages");
 
-        Message result = _consumer.receive(POSITIVE_TIMEOUT);;
+        Message result = _consumer.receive(POSITIVE_TIMEOUT);
         assertNotNull("Message expected", result);
         // Expect the first message
         assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
@@ -502,6 +514,106 @@ public class CommitRollbackTest extends
         }
     }
 
+    public void testRollbackSoak() throws Exception
+    {
+        newConnection();
+        final int rollbackTime = 2000;
+        final int numberOfMessages = 1000;
+        final int numberOfConsumers = 2;
+        final long testTimeout = numberOfMessages * POSITIVE_TIMEOUT / numberOfConsumers;
+        sendMessage(_pubSession, _jmsQueue, numberOfMessages);
+
+        List<ListenableFuture<Void >> consumerFutures = new ArrayList<>(numberOfConsumers);
+        final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
+
+        try
+        {
+            final CountDownLatch modeFlippedLatch = new CountDownLatch(1);
+            final AtomicInteger counter = new AtomicInteger();
+            final AtomicInteger rollbackCounter = new AtomicInteger();
+            final long flipModeTime = System.currentTimeMillis() + rollbackTime;
+            final AtomicBoolean shutdown = new AtomicBoolean();
+
+            for (int i = 0; i < numberOfConsumers; ++i)
+            {
+                consumerFutures.add(threadPool.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+                        final MessageConsumer consumer = session.createConsumer(_jmsQueue);
+
+                        while(!shutdown.get())
+                        {
+                            Message m = consumer.receive(POSITIVE_TIMEOUT);
+                            if (m != null)
+                            {
+                                long currentTime = System.currentTimeMillis();
+                                if (currentTime < flipModeTime)
+                                {
+                                    session.rollback();
+                                    rollbackCounter.incrementAndGet();
+                                }
+                                else
+                                {
+                                    modeFlippedLatch.countDown();
+                                    counter.incrementAndGet();
+                                    session.commit();
+                                }
+                            }
+
+                            if (counter.get() == numberOfMessages)
+                            {
+                                break;
+                            }
+
+                            if (Thread.currentThread().isInterrupted())
+                            {
+                                break;
+                            }
+                        }
+
+                        return null;
+                    }
+                }));
+            }
+
+            final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
+            modeFlippedLatch.await(rollbackTime * 2, TimeUnit.MILLISECONDS);
+            try
+            {
+                combinedFuture.get(testTimeout, TimeUnit.MILLISECONDS);
+                _logger.debug("Performed {} rollbacks, consumed {}/{} messages",
+                              rollbackCounter.get(),
+                              counter.get(),
+                              numberOfMessages);
+            }
+            catch (TimeoutException e)
+            {
+                fail(String.format(
+                        "Test took more than %.1f seconds. All consumers probably starved. Performed %d rollbacks, consumed %d/%d messages",
+                        testTimeout / 1000.,
+                        rollbackCounter.get(),
+                        counter.get(),
+                        numberOfMessages));
+            }
+            finally
+            {
+                shutdown.set(true);
+            }
+            assertEquals(String.format(
+                    "Unexpected number of messages received. Performed %d rollbacks, consumed %d/%d messages",
+                    rollbackCounter.get(),
+                    counter.get(),
+                    numberOfMessages), numberOfMessages, counter.get());
+        }
+        finally
+        {
+            threadPool.shutdownNow();
+            threadPool.awaitTermination(2 * POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+        }
+    }
 
     public void testResendUnseenMessagesAfterRollback() throws Exception
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org