You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/11 02:28:19 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1333 SendACK listener message loss (adding test)

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 393cf9bfe -> e5ef406e4


ARTEMIS-1333 SendACK listener message loss (adding test)

next commit should have the fix.
this is to make it easy to confirm the fix by people looking.

(cherry picked from commit 96c6268f5a68a293589ac0061d561265d9e79972)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9724571a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9724571a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9724571a

Branch: refs/heads/1.x
Commit: 9724571a98dbd4dad241aded3d01fb0f529087d8
Parents: 393cf9b
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Aug 8 22:39:20 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 10 22:24:37 2017 -0400

----------------------------------------------------------------------
 .../core/server/impl/ActiveMQServerImpl.java    |   2 +-
 .../integration/client/SendAckFailTest.java     | 766 +++++++++++++++++++
 2 files changed, 767 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9724571a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index eccc8b9..97cb4aa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1859,7 +1859,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    /**
     * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
     */
-   private StorageManager createStorageManager() {
+   protected StorageManager createStorageManager() {
       if (configuration.isPersistenceEnabled()) {
          if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
             return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9724571a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
new file mode 100644
index 0000000..7d969a2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.persistence.GroupingInfo;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
+import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.RouteContextList;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.JournalLoader;
+import org.apache.activemq.artemis.core.transaction.ResourceManager;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SendAckFailTest extends ActiveMQTestBase {
+
+   @Before
+   @After
+   public void deleteDirectory() throws Exception {
+      deleteDirectory(new File("./target/send-ack"));
+   }
+
+   @Override
+   public String getJournalDir(final int index, final boolean backup) {
+      return "./target/send-ack/journal";
+   }
+
+   @Override
+   protected String getBindingsDir(final int index, final boolean backup) {
+      return "./target/send-ack/binding";
+   }
+
+   @Override
+   protected String getPageDir(final int index, final boolean backup) {
+      return "./target/send-ack/page";
+   }
+
+   @Override
+   protected String getLargeMessagesDir(final int index, final boolean backup) {
+      return "./target/send-ack/large-message";
+   }
+
+   @Test
+   public void testSend() throws Exception {
+      Process process = SpawnedVMSupport.spawnVM(SendAckFailTest.class.getName());
+      ActiveMQServer server = null;
+
+      try {
+
+         HashSet<Integer> listSent = new HashSet<>();
+
+         Thread t = null;
+         {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+            ServerLocator locator = factory.getServerLocator();
+
+            locator.setConfirmationWindowSize(0).setInitialConnectAttempts(100).setRetryInterval(100).setBlockOnDurableSend(false).setReconnectAttempts(0);
+
+            ClientSessionFactory sf = locator.createSessionFactory();
+
+            ClientSession session = sf.createSession();
+            session.createQueue("T1", "T1", true);
+
+            ClientProducer producer = session.createProducer("T1");
+
+            session.setSendAcknowledgementHandler(new SendAcknowledgementHandler() {
+               @Override
+               public void sendAcknowledged(Message message) {
+                  listSent.add(message.getIntProperty("myid"));
+               }
+            });
+
+            t = new Thread() {
+               @Override
+               public void run() {
+                  for (int i = 0; i < 5000; i++) {
+                     try {
+                        producer.send(session.createMessage(true).putIntProperty("myid", i));
+                     } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                     }
+                  }
+               }
+            };
+            t.start();
+         }
+
+         Wait.waitFor(() -> listSent.size() > 100, 5000, 10);
+
+         Assert.assertTrue(process.waitFor(1, TimeUnit.MINUTES));
+
+         server = startServer(false);
+
+         {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+            ServerLocator locator = factory.getServerLocator();
+
+            ClientSessionFactory sf = locator.createSessionFactory();
+
+            ClientSession session = sf.createSession();
+
+            ClientConsumer consumer = session.createConsumer("T1");
+
+            session.start();
+
+            for (int i = 0; i < listSent.size(); i++) {
+               ClientMessage message = consumer.receive(1000);
+               if (message == null) {
+                  for (Integer msgi : listSent) {
+                     System.out.println("Message " + msgi + " was lost");
+                  }
+                  fail("missed messages!");
+               }
+               message.acknowledge();
+
+               if (!listSent.remove(message.getIntProperty("myid"))) {
+                  System.out.println("Message " + message + " with id " + message.getIntProperty("myid") + " received in duplicate");
+                  fail("Message " + message + " with id " + message.getIntProperty("myid") + " received in duplicate");
+               }
+            }
+         }
+
+      } finally {
+         if (process != null) {
+            process.destroy();
+         }
+         if (server != null) {
+            server.stop();
+         }
+      }
+   }
+
+   public static void main(String[] arg) {
+      SendAckFailTest test = new SendAckFailTest();
+      test.startServer(true);
+   }
+
+   public ActiveMQServer startServer(boolean fail) {
+      try {
+         //ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true, true);
+
+         AtomicInteger count = new AtomicInteger(0);
+
+         ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
+
+         Configuration configuration = createDefaultConfig(true);
+
+         ActiveMQServer server = new ActiveMQServerImpl(configuration, ManagementFactory.getPlatformMBeanServer(), securityManager) {
+            @Override
+            public StorageManager createStorageManager() {
+               StorageManager original = super.createStorageManager();
+
+               return new StorageManagerDelegate(original) {
+                  @Override
+                  public void storeMessage(ServerMessage message) throws Exception {
+
+                     if (fail) {
+                        if (count.incrementAndGet() == 110) {
+                           System.out.println("Failing " + message);
+                           System.out.flush();
+                           Thread.sleep(100);
+                           Runtime.getRuntime().halt(-1);
+                        }
+                     }
+                     super.storeMessage(message);
+
+                  }
+               };
+
+            }
+         };
+
+
+
+         System.out.println("Location::" + server.getConfiguration().getJournalLocation().getAbsolutePath());
+         server.start();
+         return server;
+      } catch (Exception e) {
+         e.printStackTrace();
+         return null;
+      }
+   }
+
+
+   private class StorageManagerDelegate implements StorageManager {
+
+      @Override
+      public void start() throws Exception {
+         manager.start();
+      }
+
+      @Override
+      public void stop() throws Exception {
+         manager.stop();
+      }
+
+      @Override
+      public boolean isStarted() {
+         return manager.isStarted();
+      }
+
+      @Override
+      public long generateID() {
+         return manager.generateID();
+      }
+
+      @Override
+      public long getCurrentID() {
+         return manager.getCurrentID();
+      }
+
+      @Override
+      public void criticalError(Throwable error) {
+         manager.criticalError(error);
+      }
+
+      @Override
+      public OperationContext getContext() {
+         return manager.getContext();
+      }
+
+      @Override
+      public void lineUpContext() {
+         manager.lineUpContext();
+      }
+
+      @Override
+      public OperationContext newContext(Executor executor) {
+         return manager.newContext(executor);
+      }
+
+      @Override
+      public OperationContext newSingleThreadContext() {
+         return manager.newSingleThreadContext();
+      }
+
+      @Override
+      public void setContext(OperationContext context) {
+         manager.setContext(context);
+      }
+
+      @Override
+      public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
+         manager.stop(ioCriticalError, sendFailover);
+      }
+
+      @Override
+      public void pageClosed(SimpleString storeName, int pageNumber) {
+         manager.pageClosed(storeName, pageNumber);
+      }
+
+      @Override
+      public void pageDeleted(SimpleString storeName, int pageNumber) {
+         manager.pageDeleted(storeName, pageNumber);
+      }
+
+      @Override
+      public void pageWrite(PagedMessage message, int pageNumber) {
+         manager.pageWrite(message, pageNumber);
+      }
+
+      @Override
+      public void afterCompleteOperations(IOCallback run) {
+         manager.afterCompleteOperations(run);
+      }
+
+      @Override
+      public void afterStoreOperations(IOCallback run) {
+         manager.afterStoreOperations(run);
+      }
+
+      @Override
+      public boolean waitOnOperations(long timeout) throws Exception {
+         return manager.waitOnOperations(timeout);
+      }
+
+      @Override
+      public void waitOnOperations() throws Exception {
+         manager.waitOnOperations();
+      }
+
+      @Override
+      public void beforePageRead() throws Exception {
+         manager.beforePageRead();
+      }
+
+      @Override
+      public void afterPageRead() throws Exception {
+         manager.afterPageRead();
+      }
+
+      @Override
+      public ByteBuffer allocateDirectBuffer(int size) {
+         return manager.allocateDirectBuffer(size);
+      }
+
+      @Override
+      public void freeDirectBuffer(ByteBuffer buffer) {
+         manager.freeDirectBuffer(buffer);
+      }
+
+      @Override
+      public void clearContext() {
+         manager.clearContext();
+      }
+
+      @Override
+      public void confirmPendingLargeMessageTX(Transaction transaction,
+                                               long messageID,
+                                               long recordID) throws Exception {
+         manager.confirmPendingLargeMessageTX(transaction, messageID, recordID);
+      }
+
+      @Override
+      public void confirmPendingLargeMessage(long recordID) throws Exception {
+         manager.confirmPendingLargeMessage(recordID);
+      }
+
+      @Override
+      public void storeMessage(ServerMessage message) throws Exception {
+         manager.storeMessage(message);
+      }
+
+      @Override
+      public void storeReference(long queueID, long messageID, boolean last) throws Exception {
+         manager.storeReference(queueID, messageID, last);
+      }
+
+      @Override
+      public void deleteMessage(long messageID) throws Exception {
+         manager.deleteMessage(messageID);
+      }
+
+      @Override
+      public void storeAcknowledge(long queueID, long messageID) throws Exception {
+         manager.storeAcknowledge(queueID, messageID);
+      }
+
+      @Override
+      public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception {
+         manager.storeCursorAcknowledge(queueID, position);
+      }
+
+      @Override
+      public void updateDeliveryCount(MessageReference ref) throws Exception {
+         manager.updateDeliveryCount(ref);
+      }
+
+      @Override
+      public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
+         manager.updateScheduledDeliveryTime(ref);
+      }
+
+      @Override
+      public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception {
+         manager.storeDuplicateID(address, duplID, recordID);
+      }
+
+      @Override
+      public void deleteDuplicateID(long recordID) throws Exception {
+         manager.deleteDuplicateID(recordID);
+      }
+
+      @Override
+      public void storeMessageTransactional(long txID, ServerMessage message) throws Exception {
+         manager.storeMessageTransactional(txID, message);
+      }
+
+      @Override
+      public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception {
+         manager.storeReferenceTransactional(txID, queueID, messageID);
+      }
+
+      @Override
+      public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception {
+         manager.storeAcknowledgeTransactional(txID, queueID, messageID);
+      }
+
+      @Override
+      public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception {
+         manager.storeCursorAcknowledgeTransactional(txID, queueID, position);
+      }
+
+      @Override
+      public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception {
+         manager.deleteCursorAcknowledgeTransactional(txID, ackID);
+      }
+
+      @Override
+      public void deleteCursorAcknowledge(long ackID) throws Exception {
+         manager.deleteCursorAcknowledge(ackID);
+      }
+
+      @Override
+      public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception {
+         manager.storePageCompleteTransactional(txID, queueID, position);
+      }
+
+      @Override
+      public void deletePageComplete(long ackID) throws Exception {
+         manager.deletePageComplete(ackID);
+      }
+
+      @Override
+      public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception {
+         manager.updateScheduledDeliveryTimeTransactional(txID, ref);
+      }
+
+      @Override
+      public void storeDuplicateIDTransactional(long txID,
+                                                SimpleString address,
+                                                byte[] duplID,
+                                                long recordID) throws Exception {
+         manager.storeDuplicateIDTransactional(txID, address, duplID, recordID);
+      }
+
+      @Override
+      public void updateDuplicateIDTransactional(long txID,
+                                                 SimpleString address,
+                                                 byte[] duplID,
+                                                 long recordID) throws Exception {
+         manager.updateDuplicateIDTransactional(txID, address, duplID, recordID);
+      }
+
+      @Override
+      public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception {
+         manager.deleteDuplicateIDTransactional(txID, recordID);
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage() {
+         return manager.createLargeMessage();
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception {
+         return manager.createLargeMessage(id, message);
+      }
+
+      @Override
+      public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
+         return manager.createFileForLargeMessage(messageID, extension);
+      }
+
+      @Override
+      public void prepare(long txID, Xid xid) throws Exception {
+         manager.prepare(txID, xid);
+      }
+
+      @Override
+      public void commit(long txID) throws Exception {
+         manager.commit(txID);
+      }
+
+      @Override
+      public void commit(long txID, boolean lineUpContext) throws Exception {
+         manager.commit(txID, lineUpContext);
+      }
+
+      @Override
+      public void rollback(long txID) throws Exception {
+         manager.rollback(txID);
+      }
+
+      @Override
+      public void rollbackBindings(long txID) throws Exception {
+         manager.rollbackBindings(txID);
+      }
+
+      @Override
+      public void commitBindings(long txID) throws Exception {
+         manager.commitBindings(txID);
+      }
+
+      @Override
+      public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception {
+         manager.storePageTransaction(txID, pageTransaction);
+      }
+
+      @Override
+      public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception {
+         manager.updatePageTransaction(txID, pageTransaction, depage);
+      }
+
+      @Override
+      public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception {
+         manager.updatePageTransaction(pageTransaction, depage);
+      }
+
+      @Override
+      public void deletePageTransactional(long recordID) throws Exception {
+         manager.deletePageTransactional(recordID);
+      }
+
+      @Override
+      public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
+                                                       PagingManager pagingManager,
+                                                       ResourceManager resourceManager,
+                                                       Map<Long, QueueBindingInfo> queueInfos,
+                                                       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                                       Set<Pair<Long, Long>> pendingLargeMessages,
+                                                       List<PageCountPending> pendingNonTXPageCounter,
+                                                       JournalLoader journalLoader) throws Exception {
+         return manager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, pendingNonTXPageCounter, journalLoader);
+      }
+
+      @Override
+      public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception {
+         return manager.storeHeuristicCompletion(xid, isCommit);
+      }
+
+      @Override
+      public void deleteHeuristicCompletion(long id) throws Exception {
+         manager.deleteHeuristicCompletion(id);
+      }
+
+      @Override
+      public void addQueueBinding(long tx, Binding binding) throws Exception {
+         manager.addQueueBinding(tx, binding);
+      }
+
+      @Override
+      public void deleteQueueBinding(long tx, long queueBindingID) throws Exception {
+         manager.deleteQueueBinding(tx, queueBindingID);
+      }
+
+      @Override
+      public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+         return manager.storeQueueStatus(queueID, status);
+      }
+
+      @Override
+      public void deleteQueueStatus(long recordID) throws Exception {
+         manager.deleteQueueStatus(recordID);
+      }
+
+      @Override
+      public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
+                                                       List<GroupingInfo> groupingInfos) throws Exception {
+         return manager.loadBindingJournal(queueBindingInfos, groupingInfos);
+      }
+
+      @Override
+      public void addGrouping(GroupBinding groupBinding) throws Exception {
+         manager.addGrouping(groupBinding);
+      }
+
+      @Override
+      public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception {
+         manager.deleteGrouping(tx, groupBinding);
+      }
+
+      @Override
+      public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception {
+         manager.storeAddressSetting(addressSetting);
+      }
+
+      @Override
+      public void deleteAddressSetting(SimpleString addressMatch) throws Exception {
+         manager.deleteAddressSetting(addressMatch);
+      }
+
+      @Override
+      public List<PersistedAddressSetting> recoverAddressSettings() throws Exception {
+         return manager.recoverAddressSettings();
+      }
+
+      @Override
+      public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception {
+         manager.storeSecurityRoles(persistedRoles);
+      }
+
+      @Override
+      public void deleteSecurityRoles(SimpleString addressMatch) throws Exception {
+         manager.deleteSecurityRoles(addressMatch);
+      }
+
+      @Override
+      public List<PersistedRoles> recoverPersistedRoles() throws Exception {
+         return manager.recoverPersistedRoles();
+      }
+
+      @Override
+      public long storePageCounter(long txID, long queueID, long value) throws Exception {
+         return manager.storePageCounter(txID, queueID, value);
+      }
+
+      @Override
+      public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
+         return manager.storePendingCounter(queueID, pageID, inc);
+      }
+
+      @Override
+      public void deleteIncrementRecord(long txID, long recordID) throws Exception {
+         manager.deleteIncrementRecord(txID, recordID);
+      }
+
+      @Override
+      public void deletePageCounter(long txID, long recordID) throws Exception {
+         manager.deletePageCounter(txID, recordID);
+      }
+
+      @Override
+      public void deletePendingPageCounter(long txID, long recordID) throws Exception {
+         manager.deletePendingPageCounter(txID, recordID);
+      }
+
+      @Override
+      public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
+         return manager.storePageCounterInc(txID, queueID, add);
+      }
+
+      @Override
+      public long storePageCounterInc(long queueID, int add) throws Exception {
+         return manager.storePageCounterInc(queueID, add);
+      }
+
+      @Override
+      public Journal getBindingsJournal() {
+         return manager.getBindingsJournal();
+      }
+
+      @Override
+      public Journal getMessageJournal() {
+         return manager.getMessageJournal();
+      }
+
+      @Override
+      public void startReplication(ReplicationManager replicationManager,
+                                   PagingManager pagingManager,
+                                   String nodeID,
+                                   boolean autoFailBack,
+                                   long initialReplicationSyncTimeout) throws Exception {
+         manager.startReplication(replicationManager, pagingManager, nodeID, autoFailBack, initialReplicationSyncTimeout);
+      }
+
+      @Override
+      public boolean addToPage(PagingStore store,
+                               ServerMessage msg,
+                               Transaction tx,
+                               RouteContextList listCtx) throws Exception {
+         return manager.addToPage(store, msg, tx, listCtx);
+      }
+
+      @Override
+      public void stopReplication() {
+         manager.stopReplication();
+      }
+
+      @Override
+      public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception {
+         manager.addBytesToLargeMessage(appendFile, messageID, bytes);
+      }
+
+      @Override
+      public void storeID(long journalID, long id) throws Exception {
+         manager.storeID(journalID, id);
+      }
+
+      @Override
+      public void deleteID(long journalD) throws Exception {
+         manager.deleteID(journalD);
+      }
+
+      @Override
+      public void readLock() {
+         manager.readLock();
+      }
+
+      @Override
+      public void readUnLock() {
+         manager.readUnLock();
+      }
+
+      @Override
+      public void persistIdGenerator() {
+         manager.persistIdGenerator();
+      }
+
+      @Override
+      public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+         manager.injectMonitor(monitor);
+      }
+
+      private final StorageManager manager;
+
+      StorageManagerDelegate(StorageManager manager) {
+         this.manager = manager;
+      }
+   }
+
+}
\ No newline at end of file


[2/2] activemq-artemis git commit: ARTEMIS-1333 Fix SendACK

Posted by cl...@apache.org.
ARTEMIS-1333 Fix SendACK

(fix copied manually from master.. not possible to cherry-pick)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e5ef406e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e5ef406e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e5ef406e

Branch: refs/heads/1.x
Commit: e5ef406e4e4fbb03b2ce1afdf19d7e9aff8367f4
Parents: 9724571
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Aug 10 22:24:21 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 10 22:27:57 2017 -0400

----------------------------------------------------------------------
 .../core/ServerSessionPacketHandler.java        | 621 ++++++++++---------
 .../core/impl/ActiveMQPacketHandler.java        |   7 +-
 .../protocol/core/impl/CoreSessionCallback.java |  18 +
 .../core/server/impl/ServerSessionImpl.java     |   1 +
 .../spi/core/protocol/SessionCallback.java      |   6 +-
 .../ActiveMQServerControlUsingCoreTest.java     |   7 +
 6 files changed, 354 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 5dbb4f1..c3fc01d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -20,6 +20,7 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -133,6 +134,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private final boolean direct;
 
+   private static final ThreadLocal<AtomicBoolean> inHandler = ThreadLocal.withInitial(AtomicBoolean::new);
+
    private final Executor callExecutor;
 
    public ServerSessionPacketHandler(final Executor callExecutor,
@@ -170,18 +173,21 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
       ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
 
+      flushExecutor();
+
       try {
          session.close(true);
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.errorClosingSession(e);
       }
-      flushExecutor();
 
       ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
    }
 
-   private void flushExecutor() {
-      OrderedExecutorFactory.flushExecutor(callExecutor);
+   public void flushExecutor() {
+      if (!inHandler.get().get()) {
+         OrderedExecutorFactory.flushExecutor(callExecutor);
+      }
    }
 
    public void close() {
@@ -202,345 +208,350 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    @Override
    public void handlePacket(final Packet packet) {
-      channel.confirm(packet);
       callExecutor.execute(() -> internalHandlePacket(packet));
    }
 
    private void internalHandlePacket(final Packet packet) {
-      byte type = packet.getType();
 
-      storageManager.setContext(session.getSessionContext());
+      inHandler.get().set(true);
+      try {
+         byte type = packet.getType();
 
-      Packet response = null;
-      boolean flush = false;
-      boolean closeChannel = false;
-      boolean requiresResponse = false;
+         storageManager.setContext(session.getSessionContext());
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
-      }
+         Packet response = null;
+         boolean flush = false;
+         boolean closeChannel = false;
+         boolean requiresResponse = false;
+
+         if (logger.isTraceEnabled()) {
+            logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
+         }
 
-      try {
          try {
-            switch (type) {
-               case SESS_CREATECONSUMER: {
-                  SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
-                  requiresResponse = request.isRequiresResponse();
-                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
-                  if (requiresResponse) {
-                     // We send back queue information on the queue as a response- this allows the queue to
-                     // be automatically recreated on failover
-                     QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
+            try {
+               switch (type) {
+                  case SESS_CREATECONSUMER: {
+                     SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
+                     requiresResponse = request.isRequiresResponse();
+                     session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
+                     if (requiresResponse) {
+                        // We send back queue information on the queue as a response- this allows the queue to
+                        // be automatically recreated on failover
+                        QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
+                        if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
+                           response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
+                        } else {
+                           response = new SessionQueueQueryResponseMessage(queueQueryResult);
+                        }
+                     }
+
+                     break;
+                  }
+                  case CREATE_QUEUE: {
+                     CreateQueueMessage request = (CreateQueueMessage) packet;
+                     requiresResponse = request.isRequiresResponse();
+                     session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case CREATE_SHARED_QUEUE: {
+                     CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet;
+                     requiresResponse = request.isRequiresResponse();
+                     session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case DELETE_QUEUE: {
+                     requiresResponse = true;
+                     SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
+                     session.deleteQueue(request.getQueueName());
+                     response = new NullResponseMessage();
+                     break;
+                  }
+                  case SESS_QUEUEQUERY: {
+                     requiresResponse = true;
+                     SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
+                     QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
                      if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
-                        response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
+                        response = new SessionQueueQueryResponseMessage_V2(result);
                      } else {
-                        response = new SessionQueueQueryResponseMessage(queueQueryResult);
+                        response = new SessionQueueQueryResponseMessage(result);
                      }
+                     break;
                   }
-
-                  break;
-               }
-               case CREATE_QUEUE: {
-                  CreateQueueMessage request = (CreateQueueMessage) packet;
-                  requiresResponse = request.isRequiresResponse();
-                  session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable());
-                  if (requiresResponse) {
+                  case SESS_BINDINGQUERY: {
+                     requiresResponse = true;
+                     SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
+                     BindingQueryResult result = session.executeBindingQuery(request.getAddress());
+                     if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
+                        response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics());
+                     } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
+                        response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
+                     } else {
+                        response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
+                     }
+                     break;
+                  }
+                  case SESS_ACKNOWLEDGE: {
+                     SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.acknowledge(message.getConsumerID(), message.getMessageID());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_EXPIRED: {
+                     SessionExpireMessage message = (SessionExpireMessage) packet;
+                     session.expire(message.getConsumerID(), message.getMessageID());
+                     break;
+                  }
+                  case SESS_COMMIT: {
+                     requiresResponse = true;
+                     session.commit();
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case CREATE_SHARED_QUEUE: {
-                  CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet;
-                  requiresResponse = request.isRequiresResponse();
-                  session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString());
-                  if (requiresResponse) {
+                  case SESS_ROLLBACK: {
+                     requiresResponse = true;
+                     session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case DELETE_QUEUE: {
-                  requiresResponse = true;
-                  SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
-                  session.deleteQueue(request.getQueueName());
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_QUEUEQUERY: {
-                  requiresResponse = true;
-                  SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
-                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
-                  if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
-                     response = new SessionQueueQueryResponseMessage_V2(result);
-                  } else {
-                     response = new SessionQueueQueryResponseMessage(result);
+                  case SESS_XA_COMMIT: {
+                     requiresResponse = true;
+                     SessionXACommitMessage message = (SessionXACommitMessage) packet;
+                     session.xaCommit(message.getXid(), message.isOnePhase());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
                   }
-                  break;
-               }
-               case SESS_BINDINGQUERY: {
-                  requiresResponse = true;
-                  SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
-                  BindingQueryResult result = session.executeBindingQuery(request.getAddress());
-                  if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
-                     response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics());
-                  } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
-                     response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
-                  } else {
-                     response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
+                  case SESS_XA_END: {
+                     requiresResponse = true;
+                     SessionXAEndMessage message = (SessionXAEndMessage) packet;
+                     session.xaEnd(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
                   }
-                  break;
-               }
-               case SESS_ACKNOWLEDGE: {
-                  SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.acknowledge(message.getConsumerID(), message.getMessageID());
-                  if (requiresResponse) {
+                  case SESS_XA_FORGET: {
+                     requiresResponse = true;
+                     SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
+                     session.xaForget(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_JOIN: {
+                     requiresResponse = true;
+                     SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
+                     session.xaJoin(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_RESUME: {
+                     requiresResponse = true;
+                     SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
+                     session.xaResume(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_ROLLBACK: {
+                     requiresResponse = true;
+                     SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
+                     session.xaRollback(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_START: {
+                     requiresResponse = true;
+                     SessionXAStartMessage message = (SessionXAStartMessage) packet;
+                     session.xaStart(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_FAILED: {
+                     requiresResponse = true;
+                     SessionXAAfterFailedMessage message = (SessionXAAfterFailedMessage) packet;
+                     session.xaFailed(message.getXid());
+                     // no response on this case
+                     break;
+                  }
+                  case SESS_XA_SUSPEND: {
+                     requiresResponse = true;
+                     session.xaSuspend();
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_PREPARE: {
+                     requiresResponse = true;
+                     SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
+                     session.xaPrepare(message.getXid());
+                     response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_INDOUBT_XIDS: {
+                     requiresResponse = true;
+                     List<Xid> xids = session.xaGetInDoubtXids();
+                     response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+                     break;
+                  }
+                  case SESS_XA_GET_TIMEOUT: {
+                     requiresResponse = true;
+                     int timeout = session.xaGetTimeout();
+                     response = new SessionXAGetTimeoutResponseMessage(timeout);
+                     break;
+                  }
+                  case SESS_XA_SET_TIMEOUT: {
+                     requiresResponse = true;
+                     SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
+                     session.xaSetTimeout(message.getTimeoutSeconds());
+                     response = new SessionXASetTimeoutResponseMessage(true);
+                     break;
+                  }
+                  case SESS_START: {
+                     session.start();
+                     break;
+                  }
+                  case SESS_STOP: {
+                     requiresResponse = true;
+                     session.stop();
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case SESS_EXPIRED: {
-                  SessionExpireMessage message = (SessionExpireMessage) packet;
-                  session.expire(message.getConsumerID(), message.getMessageID());
-                  break;
-               }
-               case SESS_COMMIT: {
-                  requiresResponse = true;
-                  session.commit();
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_ROLLBACK: {
-                  requiresResponse = true;
-                  session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_XA_COMMIT: {
-                  requiresResponse = true;
-                  SessionXACommitMessage message = (SessionXACommitMessage) packet;
-                  session.xaCommit(message.getXid(), message.isOnePhase());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_END: {
-                  requiresResponse = true;
-                  SessionXAEndMessage message = (SessionXAEndMessage) packet;
-                  session.xaEnd(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_FORGET: {
-                  requiresResponse = true;
-                  SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
-                  session.xaForget(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_JOIN: {
-                  requiresResponse = true;
-                  SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
-                  session.xaJoin(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_RESUME: {
-                  requiresResponse = true;
-                  SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
-                  session.xaResume(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_ROLLBACK: {
-                  requiresResponse = true;
-                  SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
-                  session.xaRollback(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_START: {
-                  requiresResponse = true;
-                  SessionXAStartMessage message = (SessionXAStartMessage) packet;
-                  session.xaStart(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_FAILED: {
-                  requiresResponse = true;
-                  SessionXAAfterFailedMessage message = (SessionXAAfterFailedMessage) packet;
-                  session.xaFailed(message.getXid());
-                  // no response on this case
-                  break;
-               }
-               case SESS_XA_SUSPEND: {
-                  requiresResponse = true;
-                  session.xaSuspend();
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_PREPARE: {
-                  requiresResponse = true;
-                  SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
-                  session.xaPrepare(message.getXid());
-                  response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_INDOUBT_XIDS: {
-                  requiresResponse = true;
-                  List<Xid> xids = session.xaGetInDoubtXids();
-                  response = new SessionXAGetInDoubtXidsResponseMessage(xids);
-                  break;
-               }
-               case SESS_XA_GET_TIMEOUT: {
-                  requiresResponse = true;
-                  int timeout = session.xaGetTimeout();
-                  response = new SessionXAGetTimeoutResponseMessage(timeout);
-                  break;
-               }
-               case SESS_XA_SET_TIMEOUT: {
-                  requiresResponse = true;
-                  SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
-                  session.xaSetTimeout(message.getTimeoutSeconds());
-                  response = new SessionXASetTimeoutResponseMessage(true);
-                  break;
-               }
-               case SESS_START: {
-                  session.start();
-                  break;
-               }
-               case SESS_STOP: {
-                  requiresResponse = true;
-                  session.stop();
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_CLOSE: {
-                  requiresResponse = true;
-                  session.close(false);
-                  // removeConnectionListeners();
-                  response = new NullResponseMessage();
-                  flush = true;
-                  closeChannel = true;
-                  break;
-               }
-               case SESS_INDIVIDUAL_ACKNOWLEDGE: {
-                  SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
-                  if (requiresResponse) {
+                  case SESS_CLOSE: {
+                     requiresResponse = true;
+                     session.close(false);
+                     // removeConnectionListeners();
                      response = new NullResponseMessage();
+                     flush = true;
+                     closeChannel = true;
+                     break;
                   }
-                  break;
-               }
-               case SESS_CONSUMER_CLOSE: {
-                  requiresResponse = true;
-                  SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
-                  session.closeConsumer(message.getConsumerID());
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_FLOWTOKEN: {
-                  SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
-                  session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
-                  break;
-               }
-               case SESS_SEND: {
-                  SessionSendMessage message = (SessionSendMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.send((ServerMessage) message.getMessage(), direct);
-                  if (requiresResponse) {
+                  case SESS_INDIVIDUAL_ACKNOWLEDGE: {
+                     SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_CONSUMER_CLOSE: {
+                     requiresResponse = true;
+                     SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
+                     session.closeConsumer(message.getConsumerID());
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case SESS_SEND_LARGE: {
-                  SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
-                  session.sendLarge(message.getLargeMessage());
-                  break;
-               }
-               case SESS_SEND_CONTINUATION: {
-                  SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
-                  if (requiresResponse) {
+                  case SESS_FLOWTOKEN: {
+                     SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
+                     session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
+                     break;
+                  }
+                  case SESS_SEND: {
+                     SessionSendMessage message = (SessionSendMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.send((ServerMessage) message.getMessage(), direct);
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_SEND_LARGE: {
+                     SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
+                     session.sendLarge(message.getLargeMessage());
+                     break;
+                  }
+                  case SESS_SEND_CONTINUATION: {
+                     SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_FORCE_CONSUMER_DELIVERY: {
+                     SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet;
+                     session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
+                     break;
+                  }
+                  case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: {
+                     SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
+                     session.requestProducerCredits(message.getAddress(), message.getCredits());
+                     break;
+                  }
+                  case PacketImpl.SESS_ADD_METADATA: {
                      response = new NullResponseMessage();
+                     SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
+                     session.addMetaData(message.getKey(), message.getData());
+                     break;
+                  }
+                  case PacketImpl.SESS_ADD_METADATA2: {
+                     SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
+                     if (message.isRequiresConfirmations()) {
+                        response = new NullResponseMessage();
+                     }
+                     session.addMetaData(message.getKey(), message.getData());
+                     break;
+                  }
+                  case PacketImpl.SESS_UNIQUE_ADD_METADATA: {
+                     SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
+                     if (session.addUniqueMetaData(message.getKey(), message.getData())) {
+                        response = new NullResponseMessage();
+                     } else {
+                        response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
+                     }
+                     break;
                   }
-                  break;
-               }
-               case SESS_FORCE_CONSUMER_DELIVERY: {
-                  SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet;
-                  session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
-                  break;
-               }
-               case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: {
-                  SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
-                  session.requestProducerCredits(message.getAddress(), message.getCredits());
-                  break;
                }
-               case PacketImpl.SESS_ADD_METADATA: {
-                  response = new NullResponseMessage();
-                  SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
-                  session.addMetaData(message.getKey(), message.getData());
-                  break;
+            } catch (ActiveMQIOErrorException e) {
+               getSession().markTXFailed(e);
+               if (requiresResponse) {
+                  logger.debug("Sending exception to client", e);
+                  response = new ActiveMQExceptionMessage(e);
+               } else {
+                  ActiveMQServerLogger.LOGGER.caughtException(e);
                }
-               case PacketImpl.SESS_ADD_METADATA2: {
-                  SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
-                  if (message.isRequiresConfirmations()) {
-                     response = new NullResponseMessage();
-                  }
-                  session.addMetaData(message.getKey(), message.getData());
-                  break;
+            } catch (ActiveMQXAException e) {
+               if (requiresResponse) {
+                  logger.debug("Sending exception to client", e);
+                  response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+               } else {
+                  ActiveMQServerLogger.LOGGER.caughtXaException(e);
                }
-               case PacketImpl.SESS_UNIQUE_ADD_METADATA: {
-                  SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
-                  if (session.addUniqueMetaData(message.getKey(), message.getData())) {
-                     response = new NullResponseMessage();
+            } catch (ActiveMQException e) {
+               if (requiresResponse) {
+                  logger.debug("Sending exception to client", e);
+                  response = new ActiveMQExceptionMessage(e);
+               } else {
+                  if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
+                     logger.debug("Caught exception", e);
                   } else {
-                     response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
+                     ActiveMQServerLogger.LOGGER.caughtException(e);
                   }
-                  break;
                }
-            }
-         } catch (ActiveMQIOErrorException e) {
-            getSession().markTXFailed(e);
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(e);
-            }
-         } catch (ActiveMQXAException e) {
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtXaException(e);
-            }
-         } catch (ActiveMQException e) {
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
-                  logger.debug("Caught exception", e);
+            } catch (Throwable t) {
+               getSession().markTXFailed(t);
+               if (requiresResponse) {
+                  ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
+                  ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
+                  activeMQInternalErrorException.initCause(t);
+                  response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
                } else {
-                  ActiveMQServerLogger.LOGGER.caughtException(e);
+                  ActiveMQServerLogger.LOGGER.caughtException(t);
                }
             }
-         } catch (Throwable t) {
-            getSession().markTXFailed(t);
-            if (requiresResponse) {
-               ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
-               ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
-               activeMQInternalErrorException.initCause(t);
-               response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(t);
-            }
-         }
 
-         sendResponse(packet, response, flush, closeChannel);
+            sendResponse(packet, response, flush, closeChannel);
+         } finally {
+            storageManager.clearContext();
+         }
       } finally {
-         storageManager.clearContext();
+         inHandler.get().set(false);
       }
    }
 
@@ -583,6 +594,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                                      final boolean flush,
                                      final boolean closeChannel) {
       if (confirmPacket != null) {
+         channel.confirm(confirmPacket);
+
          if (flush) {
             channel.flushConfirmations();
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index ad114e0..71e877f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -153,10 +153,15 @@ public class ActiveMQPacketHandler implements ChannelHandler {
 
          OperationContext sessionOperationContext = server.newOperationContext();
 
-         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext);
+         CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
+
+         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), session, server.getStorageManager(), channel);
          channel.setHandler(handler);
+         sessionCallback.setSessionHandler(handler);
+
+         channel.setHandler(handler);
 
          // TODO - where is this removed?
          protocolManager.addSessionHandler(request.getName(), handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index e35771e..75a98b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
@@ -44,6 +45,8 @@ public final class CoreSessionCallback implements SessionCallback {
 
    private String name;
 
+   private ServerSessionPacketHandler handler;
+
    public CoreSessionCallback(String name,
                               ProtocolManager protocolManager,
                               Channel channel,
@@ -54,6 +57,21 @@ public final class CoreSessionCallback implements SessionCallback {
       this.connection = connection;
    }
 
+   public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler handler) {
+      this.handler = handler;
+      return this;
+   }
+
+   @Override
+   public void close(boolean failed) {
+      ServerSessionPacketHandler localHandler = handler;
+      if (localHandler != null) {
+         // We wait any pending tasks before we make this as closed
+         localHandler.flushExecutor();
+      }
+      this.handler = null;
+   }
+
    @Override
    public boolean isWritable(ReadyListener callback) {
       return connection.isWritable(callback);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 627b201..8b102bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -335,6 +335,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    protected void doClose(final boolean failed) throws Exception {
+      callback.close(failed);
       synchronized (this) {
          this.setStarted(false);
          if (closed)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 891f1ad..bb18986 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -78,4 +78,8 @@ public interface SessionCallback {
     * Some protocols (Openwire) needs a special message with the browser is finished.
     */
    void browserFinished(ServerConsumer consumer);
-}
+
+   default void close(boolean failed) {
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 60187f0..797ac60 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -38,6 +38,13 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
       return names;
    }
 
+   // it doesn't make sense through the core
+   // the pool will be shutdown while a connection is being used
+   // makes no sense!
+   @Override
+   public void testForceFailover() throws Exception {
+   }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------