You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/10/31 20:01:14 UTC

[4/8] activemq-artemis git commit: ARTEMIS-829 Removing messages re-encoding

ARTEMIS-829 Removing messages re-encoding

https://issues.apache.org/jira/browse/ARTEMIS-829


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e0021252
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e0021252
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e0021252

Branch: refs/heads/master
Commit: e0021252ee94dcafe664520e080d5a6e13e3350f
Parents: 4b5cbb8
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 24 18:20:20 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 28 16:54:58 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientProducerImpl.java    |   2 -
 .../core/client/impl/ClientSessionImpl.java     |  26 +--
 .../core/client/impl/ClientSessionInternal.java |   2 -
 .../core/impl/ActiveMQSessionContext.java       |  10 +-
 .../spi/core/remoting/SessionContext.java       |   3 +-
 .../client/HornetQClientSessionContext.java     |   5 +-
 .../tests/extras/byteman/PagingLeakTest.java    |  14 +-
 .../tests/integration/client/ProducerTest.java  |   6 +-
 .../cluster/failover/BackupSyncJournalTest.java |   2 +-
 .../journal/NIOJournalCompactTest.java          | 180 ++++++++++---------
 .../journal/ValidateTransactionHealthTest.java  |  25 +--
 .../tests/integration/paging/PagingTest.java    |  12 +-
 12 files changed, 140 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index fddd4de..1dfbe72 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -284,8 +284,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
       theCredits.acquireCredits(creditSize);
 
-      session.checkDefaultAddress(sendingAddress);
-
       sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index de45066..fd6355a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -135,8 +135,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    private volatile boolean mayAttemptToFailover = true;
 
-   private volatile SimpleString defaultAddress;
-
    /**
     * Current XID. this will be used in case of failover
     */
@@ -957,7 +955,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                // want
                // to recreate the session, we just want to unblock the blocking call
                if (!inClose && mayAttemptToFailover) {
-                  sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
+                  sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
 
                   for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) {
 
@@ -1036,27 +1034,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    @Override
    public void setAddress(final Message message, final SimpleString address) {
-      if (defaultAddress == null) {
-         logger.tracef("setAddress() Setting default address as %s", address);
+      logger.tracef("setAddress() Setting default address as %s", address);
 
-         message.setAddress(address);
-      } else {
-         if (!address.equals(defaultAddress)) {
-            logger.tracef("setAddress() setting non default address %s on message", address);
-            message.setAddress(address);
-         } else {
-            logger.trace("setAddress() being set as null");
-            message.setAddress(null);
-         }
-      }
-   }
-
-   @Override
-   public void checkDefaultAddress(SimpleString address) {
-      if (defaultAddress == null) {
-         logger.tracef("checkDefaultAddress(%s)", address);
-         defaultAddress = address;
-      }
+      message.setAddress(address);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index ed636bd..4e06068 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -93,8 +93,6 @@ public interface ClientSessionInternal extends ClientSession {
     */
    void setAddress(Message message, SimpleString address);
 
-   void checkDefaultAddress(SimpleString address);
-
    void setPacketSize(int packetSize);
 
    void resetIfNeeded() throws ActiveMQException;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index c72e19b..56c7135 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -629,9 +629,8 @@ public class ActiveMQSessionContext extends SessionContext {
                                final boolean xa,
                                final boolean autoCommitSends,
                                final boolean autoCommitAcks,
-                               final boolean preAcknowledge,
-                               final SimpleString defaultAddress) throws ActiveMQException {
-      Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress);
+                               final boolean preAcknowledge) throws ActiveMQException {
+      Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
       boolean retry;
       do {
          try {
@@ -662,9 +661,8 @@ public class ActiveMQSessionContext extends SessionContext {
                                                    boolean xa,
                                                    boolean autoCommitSends,
                                                    boolean autoCommitAcks,
-                                                   boolean preAcknowledge,
-                                                   SimpleString defaultAddress) {
-      return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString());
+                                                   boolean preAcknowledge) {
+      return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 175360c..1f15cc6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -250,8 +250,7 @@ public abstract class SessionContext {
                                         final boolean xa,
                                         final boolean autoCommitSends,
                                         final boolean autoCommitAcks,
-                                        final boolean preAcknowledge,
-                                        final SimpleString defaultAddress) throws ActiveMQException;
+                                        final boolean preAcknowledge) throws ActiveMQException;
 
    public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
index d932274..caa94a1 100644
--- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
+++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -63,9 +63,8 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
                                                    boolean xa,
                                                    boolean autoCommitSends,
                                                    boolean autoCommitAcks,
-                                                   boolean preAcknowledge,
-                                                   SimpleString defaultAddress) {
-      return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString());
+                                                   boolean preAcknowledge) {
+      return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), null);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java
index f9744d2..4ffd2bd 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMRules;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@@ -92,10 +93,13 @@ public class PagingLeakTest extends ActiveMQTestBase {
 
       positions.clear();
 
-      timeout = System.currentTimeMillis() + 5000;
-      while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis()) {
-         forceGC();
-      }
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            forceGC();
+            return pagePosInstances.get() == 0;
+         }
+      }, 5000, 100);
 
       // This is just to validate the rules are correctly applied on byteman
       assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get());
@@ -110,7 +114,7 @@ public class PagingLeakTest extends ActiveMQTestBase {
 
       server.start();
 
-      AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(10 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
       server.getAddressSettingsRepository().addMatch("#", settings);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java
index c409d5f..d7af4b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java
@@ -104,12 +104,12 @@ public class ProducerTest extends ActiveMQTestBase {
                   ClientProducer producer = session.createProducer();
 
                   for (int i = 0; i < 62; i++) {
-                     if (i == 61) {
+                     if (i == 30) {
                         // the point where the send would block
                         latch.countDown();
                      }
                      ClientMessage msg = session.createMessage(false);
-                     msg.getBodyBuffer().writeBytes(new byte[1024]);
+                     msg.getBodyBuffer().writeBytes(new byte[2048]);
                      producer.send(QUEUE, msg);
                   }
                } catch (Exception e) {
@@ -119,7 +119,7 @@ public class ProducerTest extends ActiveMQTestBase {
          };
 
          t.start();
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         assertTrue(latch.await(10, TimeUnit.SECONDS));
          session.close();
 
          t.join(5000);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index 20ddae3..b51ff8a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -93,7 +93,7 @@ public class BackupSyncJournalTest extends FailoverTestBase {
 
    @Test
    public void testReserveFileIdValuesOnBackup() throws Exception {
-      final int totalRounds = 50;
+      final int totalRounds = 5;
       createProducerSendSomeMessages();
       JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
       for (int i = 0; i < totalRounds; i++) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index ee1ac11..2dd38ae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -730,7 +730,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
    @Test
    public void testCompactAddAndUpdateFollowedByADelete() throws Exception {
-
       setup(2, 60 * 1024, false);
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
@@ -779,7 +778,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
       createJournal();
       startJournal();
       loadAndCheck();
-
    }
 
    @Test
@@ -1610,8 +1608,9 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
    }
 
+
    @Test
-   public void testStressDeletesNoSync() throws Exception {
+   public void testStressDeletesNoSync() throws Throwable {
       Configuration config = createBasicConfig().setJournalFileSize(100 * 1024).setJournalSyncNonTransactional(false).setJournalSyncTransactional(false).setJournalCompactMinFiles(0).setJournalCompactPercentage(0);
 
       final AtomicInteger errors = new AtomicInteger(0);
@@ -1629,114 +1628,129 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
       final JournalStorageManager storage = new JournalStorageManager(config, factory);
 
       storage.start();
-      storage.loadInternalOnly();
-
-      ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false);
-      final LinkedList<Long> survivingMsgs = new LinkedList<>();
 
-      Runnable producerRunnable = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               while (running.get()) {
-                  final long[] values = new long[100];
-                  long tx = seqGenerator.incrementAndGet();
+      try {
+         storage.loadInternalOnly();
 
-                  OperationContextImpl ctx = new OperationContextImpl(executor);
-                  storage.setContext(ctx);
+         ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false);
+         final LinkedList<Long> survivingMsgs = new LinkedList<>();
 
-                  for (int i = 0; i < 100; i++) {
-                     long id = seqGenerator.incrementAndGet();
-                     values[i] = id;
+         Runnable producerRunnable = new Runnable() {
+            @Override
+            public void run() {
+               try {
+                  while (running.get()) {
+                     final long[] values = new long[100];
+                     long tx = seqGenerator.incrementAndGet();
 
-                     ServerMessageImpl message = new ServerMessageImpl(id, 100);
+                     OperationContextImpl ctx = new OperationContextImpl(executor);
+                     storage.setContext(ctx);
 
-                     message.getBodyBuffer().writeBytes(new byte[1024]);
+                     for (int i = 0; i < 100; i++) {
+                        long id = seqGenerator.incrementAndGet();
+                        values[i] = id;
 
-                     storage.storeMessageTransactional(tx, message);
-                  }
-                  ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
+                        ServerMessageImpl message = new ServerMessageImpl(id, 100);
 
-                  survivingMsgs.add(message.getMessageID());
+                        message.getBodyBuffer().writeBytes(new byte[1024]);
 
-                  // This one will stay here forever
-                  storage.storeMessage(message);
-
-                  storage.commit(tx);
-
-                  ctx.executeOnCompletion(new IOCallback() {
-                     @Override
-                     public void onError(int errorCode, String errorMessage) {
+                        storage.storeMessageTransactional(tx, message);
                      }
-
-                     @Override
-                     public void done() {
-                        deleteExecutor.execute(new Runnable() {
-                           @Override
-                           public void run() {
-                              try {
-                                 for (long messageID : values) {
-                                    storage.deleteMessage(messageID);
+                     ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100);
+
+                     survivingMsgs.add(message.getMessageID());
+
+                     // This one will stay here forever
+                     storage.storeMessage(message);
+
+                     storage.commit(tx);
+
+                     ctx.executeOnCompletion(new IOCallback() {
+                        @Override
+                        public void onError(int errorCode, String errorMessage) {
+                        }
+
+                        @Override
+                        public void done() {
+                           deleteExecutor.execute(new Runnable() {
+                              @Override
+                              public void run() {
+                                 try {
+                                    for (long messageID : values) {
+                                       storage.deleteMessage(messageID);
+                                    }
+                                 } catch (Exception e) {
+                                    e.printStackTrace();
+                                    errors.incrementAndGet();
                                  }
-                              } catch (Exception e) {
-                                 e.printStackTrace();
-                                 errors.incrementAndGet();
-                              }
 
-                           }
-                        });
-                     }
-                  });
+                              }
+                           });
+                        }
+                     });
 
+                  }
+               } catch (Throwable e) {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
                }
-            } catch (Throwable e) {
-               e.printStackTrace();
-               errors.incrementAndGet();
             }
-         }
-      };
-
-      Runnable compressRunnable = new Runnable() {
-         @Override
-         public void run() {
-            try {
-               while (running.get()) {
-                  Thread.sleep(500);
-                  System.out.println("Compacting");
-                  ((JournalImpl) storage.getMessageJournal()).testCompact();
-                  ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus();
+         };
+
+         Runnable compressRunnable = new Runnable() {
+            @Override
+            public void run() {
+               try {
+                  while (running.get()) {
+                     Thread.sleep(500);
+                     System.out.println("Compacting");
+                     ((JournalImpl) storage.getMessageJournal()).testCompact();
+                     ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus();
+                  }
+               } catch (Throwable e) {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
                }
-            } catch (Throwable e) {
-               e.printStackTrace();
-               errors.incrementAndGet();
+
             }
+         };
 
-         }
-      };
+         Thread producerThread = new Thread(producerRunnable);
+         producerThread.start();
 
-      Thread producerThread = new Thread(producerRunnable);
-      producerThread.start();
+         Thread compactorThread = new Thread(compressRunnable);
+         compactorThread.start();
 
-      Thread compactorThread = new Thread(compressRunnable);
-      compactorThread.start();
+         Thread.sleep(1000);
 
-      Thread.sleep(1000);
+         running.set(false);
 
-      running.set(false);
+         producerThread.join();
 
-      producerThread.join();
+         compactorThread.join();
 
-      compactorThread.join();
+         deleteExecutor.shutdown();
 
-      storage.stop();
+         assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
 
-      executor.shutdown();
+         executor.shutdown();
 
-      assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
+         assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS));
 
-      deleteExecutor.shutdown();
+      } catch (Throwable e) {
+         e.printStackTrace();
+         throw e;
+      } finally {
+         try {
+            storage.stop();
+         } catch (Exception e) {
+            e.printStackTrace();
+         }
+
+         executor.shutdownNow();
+         deleteExecutor.shutdownNow();
+      }
 
-      assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS));
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
index 1972863..2d3df3e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
@@ -144,18 +144,21 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
       JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir);
 
       journal.start();
-      Loader loadTest = new Loader(numberOfRecords);
-      journal.load(loadTest);
-      Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
-      Assert.assertEquals(0, loadTest.numberOfPreparedTransactions);
-      Assert.assertEquals(0, loadTest.numberOfUpdates);
-      Assert.assertEquals(0, loadTest.numberOfDeletes);
-
-      journal.stop();
-
-      if (loadTest.ex != null) {
-         throw loadTest.ex;
+      try {
+         Loader loadTest = new Loader(numberOfRecords);
+         journal.load(loadTest);
+         Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds);
+         Assert.assertEquals(0, loadTest.numberOfPreparedTransactions);
+         Assert.assertEquals(0, loadTest.numberOfUpdates);
+         Assert.assertEquals(0, loadTest.numberOfDeletes);
+
+         if (loadTest.ex != null) {
+            throw loadTest.ex;
+         }
+      } finally {
+         journal.stop();
       }
+
    }
 
    // Inner classes -------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 6f0bdc1..00c0bdf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -3335,7 +3335,7 @@ public class PagingTest extends ActiveMQTestBase {
       ClientMessage message = null;
 
       for (int i = 0; i < numberOfMessages; i++) {
-         byte[] body = new byte[1024];
+         byte[] body = new byte[2048];
 
          message = session.createMessage(true);
          message.getBodyBuffer().writeBytes(body);
@@ -3360,7 +3360,7 @@ public class PagingTest extends ActiveMQTestBase {
       Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize());
 
       for (int i = 0; i < numberOfMessages; i++) {
-         byte[] body = new byte[1024];
+         byte[] body = new byte[2048];
 
          message = session.createMessage(true);
          message.getBodyBuffer().writeBytes(body);
@@ -3385,7 +3385,7 @@ public class PagingTest extends ActiveMQTestBase {
       producer = session.createProducer(PagingTest.ADDRESS);
 
       for (int i = 0; i < numberOfMessages; i++) {
-         byte[] body = new byte[1024];
+         byte[] body = new byte[2048];
 
          message = session.createMessage(true);
          message.getBodyBuffer().writeBytes(body);
@@ -3841,7 +3841,7 @@ public class PagingTest extends ActiveMQTestBase {
 
       Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024);
 
-      server = createServer(true, config, 512 * 1024, 1024 * 1024);
+      server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2);
 
       server.start();
 
@@ -4745,7 +4745,7 @@ public class PagingTest extends ActiveMQTestBase {
 
       ClientMessage message = session.createMessage(true);
 
-      int biggerMessageSize = 1024;
+      int biggerMessageSize = 2048;
       byte[] body = new byte[biggerMessageSize];
       ByteBuffer bb = ByteBuffer.wrap(body);
       for (int j = 1; j <= biggerMessageSize; j++) {
@@ -4817,7 +4817,7 @@ public class PagingTest extends ActiveMQTestBase {
 
       ClientMessage message = session.createMessage(true);
 
-      int biggerMessageSize = 1024;
+      int biggerMessageSize = 2048;
       byte[] body = new byte[biggerMessageSize];
       ByteBuffer bb = ByteBuffer.wrap(body);
       for (int j = 1; j <= biggerMessageSize; j++) {