You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/01 04:20:18 UTC

[2/4] activemq-artemis git commit: major refactoring on Transactions and AMQ objects

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 77705fa..f3dd306 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -74,7 +75,6 @@ import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.Transaction.State;
-import org.apache.activemq.artemis.core.transaction.TransactionFactory;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -97,6 +97,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    // Attributes ----------------------------------------------------------------------------
 
+   private boolean securityEnabled = true;
+
    protected final String username;
 
    protected final String password;
@@ -169,32 +171,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    // concurrently.
    private volatile boolean closed = false;
 
-   private final TransactionFactory transactionFactory;
-
-   public ServerSessionImpl(final String name,
-                            final String username,
-                            final String password,
-                            final int minLargeMessageSize,
-                            final boolean autoCommitSends,
-                            final boolean autoCommitAcks,
-                            final boolean preAcknowledge,
-                            final boolean strictUpdateDeliveryCount,
-                            final boolean xa,
-                            final RemotingConnection remotingConnection,
-                            final StorageManager storageManager,
-                            final PostOffice postOffice,
-                            final ResourceManager resourceManager,
-                            final SecurityStore securityStore,
-                            final ManagementService managementService,
-                            final ActiveMQServer server,
-                            final SimpleString managementAddress,
-                            final SimpleString defaultAddress,
-                            final SessionCallback callback,
-                            final OperationContext context,
-                            final QueueCreator queueCreator) throws Exception {
-      this(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, strictUpdateDeliveryCount, xa, remotingConnection, storageManager, postOffice, resourceManager, securityStore, managementService, server, managementAddress, defaultAddress, callback, context, null, queueCreator);
-   }
-
    public ServerSessionImpl(final String name,
                             final String username,
                             final String password,
@@ -215,7 +191,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final SimpleString defaultAddress,
                             final SessionCallback callback,
                             final OperationContext context,
-                            TransactionFactory transactionFactory,
                             final QueueCreator queueCreator) throws Exception {
       this.username = username;
 
@@ -261,13 +236,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       this.queueCreator = queueCreator;
 
-      if (transactionFactory == null) {
-         this.transactionFactory = new DefaultTransactionFactory();
-      }
-      else {
-         this.transactionFactory = transactionFactory;
-      }
-
       if (!xa) {
          tx = newTransaction();
       }
@@ -275,6 +243,19 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    // ServerSession implementation ----------------------------------------------------------------------------
 
+   @Override
+   public void enableSecurity() {
+      this.securityEnabled = true;
+   }
+
+   @Override
+   public void disableSecurity() {
+      this.securityEnabled = false;
+   }
+
+   public boolean isClosed() {
+      return closed;
+   }
    /**
     * @return the sessionContext
     */
@@ -386,7 +367,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          remotingConnection.removeFailureListener(this);
 
-         callback.closed();
+         if (callback != null) {
+            callback.closed();
+         }
 
          closed = true;
       }
@@ -397,6 +380,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       return queueCreator;
    }
 
+   protected void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception {
+      if (securityEnabled) {
+         securityStore.check(address, checkType, auth);
+      }
+   }
+
    @Override
    public ServerConsumer createConsumer(final long consumerID,
                                         final SimpleString queueName,
@@ -417,11 +406,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
       }
 
-      securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
+      securityCheck(binding.getAddress(), CheckType.CONSUME, this);
 
       Filter filter = FilterImpl.createFilter(filterString);
 
-      ServerConsumer consumer = newConsumer(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
+      ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding)binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
       consumers.put(consumer.getID(), consumer);
 
       if (!browseOnly) {
@@ -465,20 +454,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       return consumer;
    }
 
-   protected ServerConsumer newConsumer(long consumerID,
-                                        ServerSessionImpl serverSessionImpl,
-                                        QueueBinding binding,
-                                        Filter filter,
-                                        boolean started2,
-                                        boolean browseOnly,
-                                        StorageManager storageManager2,
-                                        SessionCallback callback2,
-                                        boolean preAcknowledge2,
-                                        boolean strictUpdateDeliveryCount2,
-                                        ManagementService managementService2,
-                                        boolean supportLargeMessage,
-                                        Integer credits) throws Exception {
-      return new ServerConsumerImpl(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+   /** Some protocols may chose to hold their transactions outside of the ServerSession.
+    *  This can be used to replace the transaction.
+    *  Notice that we set autoCommitACK and autoCommitSends to true if tx == null */
+   public void resetTX(Transaction transaction) {
+      this.tx = transaction;
+      this.autoCommitAcks = transaction == null;
+      this.autoCommitSends = transaction == null;
    }
 
    @Override
@@ -489,10 +471,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final boolean durable) throws Exception {
       if (durable) {
          // make sure the user has privileges to create this queue
-         securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
+         securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
       }
       else {
-         securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
+         securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
       }
 
       server.checkQueueCreationLimit(getUsername());
@@ -537,7 +519,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                  final SimpleString name,
                                  boolean durable,
                                  final SimpleString filterString) throws Exception {
-      securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
+      securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
 
       server.checkQueueCreationLimit(getUsername());
 
@@ -632,7 +614,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception {
-      ServerConsumer consumer = consumers.get(consumerID);
+      ServerConsumer consumer = locateConsumer(consumerID);
 
       // this would be possible if the server consumer was closed by pings/pongs.. etc
       if (consumer != null) {
@@ -640,15 +622,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
    }
 
-   public void promptDelivery(long consumerID) {
-      ServerConsumer consumer = consumers.get(consumerID);
-
-      // this would be possible if the server consumer was closed by pings/pongs.. etc
-      if (consumer != null) {
-         consumer.promptDelivery();
-      }
-   }
-
    @Override
    public void acknowledge(final long consumerID, final long messageID) throws Exception {
       ServerConsumer consumer = findConsumer(consumerID);
@@ -674,8 +647,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
    }
 
+   public ServerConsumer locateConsumer(long consumerID) {
+      return consumers.get(consumerID);
+   }
+
    private ServerConsumer findConsumer(long consumerID) throws Exception {
-      ServerConsumer consumer = consumers.get(consumerID);
+      ServerConsumer consumer = locateConsumer(consumerID);
 
       if (consumer == null) {
          Transaction currentTX = tx;
@@ -710,7 +687,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void individualCancel(final long consumerID, final long messageID, boolean failed) throws Exception {
-      ServerConsumer consumer = consumers.get(consumerID);
+      ServerConsumer consumer = locateConsumer(consumerID);
 
       if (consumer != null) {
          consumer.individualCancel(messageID, failed);
@@ -720,7 +697,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void expire(final long consumerID, final long messageID) throws Exception {
-      MessageReference ref = consumers.get(consumerID).removeReferenceByID(messageID);
+      MessageReference ref = locateConsumer(consumerID).removeReferenceByID(messageID);
 
       if (ref != null) {
          ref.getQueue().expire(ref);
@@ -778,8 +755,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    /**
     * @return
     */
-   protected Transaction newTransaction() {
-      return transactionFactory.newTransaction(null, storageManager, timeoutSeconds);
+   public Transaction newTransaction() {
+      return new TransactionImpl(null, storageManager, timeoutSeconds);
    }
 
    /**
@@ -787,7 +764,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
     * @return
     */
    private Transaction newTransaction(final Xid xid) {
-      return transactionFactory.newTransaction(xid, storageManager, timeoutSeconds);
+      return new TransactionImpl(xid, storageManager, timeoutSeconds);
    }
 
    @Override
@@ -1189,7 +1166,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void closeConsumer(final long consumerID) throws Exception {
-      final ServerConsumer consumer = consumers.get(consumerID);
+      final ServerConsumer consumer = locateConsumer(consumerID);
 
       if (consumer != null) {
          consumer.close(false);
@@ -1201,7 +1178,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception {
-      ServerConsumer consumer = consumers.get(consumerID);
+      ServerConsumer consumer = locateConsumer(consumerID);
 
       if (consumer == null) {
          ActiveMQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID);
@@ -1214,9 +1191,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public Transaction getCurrentTransaction() {
-      if (tx == null) {
-         tx = newTransaction();
-      }
       return tx;
    }
 
@@ -1489,7 +1463,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception {
       try {
-         securityStore.check(message.getAddress(), CheckType.MANAGE, this);
+         securityCheck(message.getAddress(), CheckType.MANAGE, this);
       }
       catch (ActiveMQException e) {
          if (!autoCommitSends) {
@@ -1564,7 +1538,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
       // check the user has write access to this address.
       try {
-         securityStore.check(msg.getAddress(), CheckType.SEND, this);
+         securityCheck(msg.getAddress(), CheckType.SEND, this);
       }
       catch (ActiveMQException e) {
          if (!autoCommitSends && tx != null) {
@@ -1613,12 +1587,4 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          return Collections.emptyList();
       }
    }
-
-   private static class DefaultTransactionFactory implements TransactionFactory {
-
-      @Override
-      public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) {
-         return new TransactionImpl(xid, storageManager, timeoutSeconds);
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index eb1ab3c..9b54851 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -22,6 +22,7 @@ import javax.transaction.xa.Xid;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
 
 /**
@@ -33,6 +34,12 @@ public interface Transaction {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
    }
 
+   Object getProtocolData();
+
+   /** Protocol managers can use this field to store any object needed.
+    *  An example would be the Session used by the transaction on openwire */
+   void setProtocolData(Object data);
+
    boolean isEffective();
 
    void prepare() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java
deleted file mode 100644
index 5e97826..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.transaction;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-
-import javax.transaction.xa.Xid;
-
-public interface TransactionFactory {
-
-   Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index db55aa8..5f08e61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -59,6 +59,18 @@ public class TransactionImpl implements Transaction {
 
    private int timeoutSeconds = -1;
 
+   private Object protocolData;
+
+   @Override
+   public Object getProtocolData() {
+      return protocolData;
+   }
+
+   @Override
+   public void setProtocolData(Object protocolData) {
+      this.protocolData = protocolData;
+   }
+
    public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
       this.storageManager = storageManager;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index a9eb0f2..cf0ec69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.spi.core.protocol;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -36,9 +37,15 @@ public interface SessionCallback {
 
    void sendProducerCreditsFailMessage(int credits, SimpleString address);
 
-   int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount);
+   // Note: don't be tempted to remove the parameter message
+   //       Even though ref will contain the message in certain cases
+   //       such as paging the message could be a SoftReference or WeakReference
+   //       and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
+   //
+   //       Future developments may change this, but beware why I have chosen to keep the parameter separated here
+   int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumerID, int deliveryCount);
 
-   int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount);
+   int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount);
 
    int sendLargeMessageContinuation(ServerConsumer consumerID,
                                     byte[] body,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 22ec438..53edb79 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1106,6 +1106,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+      }
+
+      @Override
       public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
          return false;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 8403ee3..45f8b30 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -567,12 +567,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
          connection = cf.createConnection();
          connection.start();
          connections.add(connection);
-         final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
 
          connection = cf.createConnection();
          connection.start();
          connections.add(connection);
-         final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
 
          final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
          final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
@@ -583,7 +583,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
          final Vector<Message> receivedMessages = new Vector<>();
          final CountDownLatch commitDoneLatch = new CountDownLatch(1);
          final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
-         new Thread() {
+         Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
             public void run() {
                LOG.info("doing async commit after consume...");
                try {
@@ -630,10 +630,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
                   e.printStackTrace();
                }
             }
-         }.start();
+         };
+         t.start();
 
          // will be stopped by the plugin
          brokerStopLatch.await(60, TimeUnit.SECONDS);
+         t.join();
          broker = createBroker();
          broker.start();
          doByteman.set(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 54ae6c8..9702624 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -507,7 +508,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
        */
       @Override
-      public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+      public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
          inCall.countDown();
          try {
             callbackSemaphore.acquire();
@@ -518,7 +519,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
 
          try {
-            return targetCallback.sendMessage(message, consumer, deliveryCount);
+            return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
          }
          finally {
             callbackSemaphore.release();
@@ -530,8 +531,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
        */
       @Override
-      public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
-         return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
+      public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+         return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
       }
 
       /* (non-Javadoc)
@@ -581,7 +582,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                                         String defaultAddress,
                                                         SessionCallback callback,
                                                         OperationContext context,
-                                                        ServerSessionFactory sessionFactory,
                                                         boolean autoCreateQueue) throws Exception {
          return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
index a1a5e38..14cfee0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
@@ -26,6 +26,7 @@ import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -118,7 +119,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
    }
 
    @Test
-   public void testSendnReceiveAuthorization() throws Exception {
+      public void testSendnReceiveAuthorization() throws Exception {
       Connection sendingConn = null;
       Connection receivingConn = null;
 
@@ -152,16 +153,18 @@ public class BasicSecurityTest extends BasicOpenWireTest {
          producer = sendingSession.createProducer(dest);
          producer.send(message);
 
-         MessageConsumer consumer = null;
+         MessageConsumer consumer;
          try {
             consumer = sendingSession.createConsumer(dest);
+            Assert.fail("exception expected");
          }
          catch (JMSSecurityException e) {
+            e.printStackTrace();
             //expected
          }
 
          consumer = receivingSession.createConsumer(dest);
-         TextMessage received = (TextMessage) consumer.receive();
+         TextMessage received = (TextMessage) consumer.receive(5000);
 
          assertNotNull(received);
          assertEquals("Hello World", received.getText());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 825b8b5..69d9784 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.junit.Test;
 
 public class OpenWireUtilTest {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index b87fc7d..c4aea03 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.openwire;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -27,24 +26,27 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 public class SimpleOpenWireTest extends BasicOpenWireTest {
 
-   @Rule
-   public ExpectedException thrown = ExpectedException.none();
-
    @Override
    @Before
    public void setUp() throws Exception {
@@ -53,6 +55,158 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    }
 
    @Test
+   public void testSimple() throws Exception {
+      Connection connection = factory.createConnection();
+
+      Collection<Session> sessions = new LinkedList<>();
+
+      for (int i = 0; i < 10; i++) {
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         sessions.add(session);
+      }
+
+      connection.close();
+   }
+
+   @Test
+   public void testTransactionalSimple() throws Exception {
+      try (Connection connection = factory.createConnection()) {
+
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName);
+         System.out.println("Queue:" + queue);
+         MessageProducer producer = session.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         producer.send(session.createTextMessage("test"));
+         session.commit();
+
+         Assert.assertNull(consumer.receive(100));
+         connection.start();
+
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertEquals("test", message.getText());
+
+         Assert.assertNotNull(message);
+
+         message.acknowledge();
+      }
+   }
+
+   @Test
+   public void testXASimple() throws Exception {
+      XAConnection connection = xaFactory.createXAConnection();
+
+      Collection<Session> sessions = new LinkedList<>();
+
+      for (int i = 0; i < 10; i++) {
+         XASession session = connection.createXASession();
+         session.getXAResource().start(newXID(), XAResource.TMNOFLAGS);
+         sessions.add(session);
+      }
+
+      connection.close();
+
+   }
+
+   @Test
+   public void testClientACK() throws Exception {
+      try {
+
+         Connection connection = factory.createConnection();
+
+         Collection<Session> sessions = new LinkedList<>();
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         System.out.println("Queue:" + queue);
+         MessageProducer producer = session.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         producer.send(session.createTextMessage("test"));
+
+         Assert.assertNull(consumer.receive(100));
+         connection.start();
+
+         TextMessage message = (TextMessage) consumer.receive(5000);
+
+         Assert.assertNotNull(message);
+
+         message.acknowledge();
+
+         connection.close();
+
+         System.err.println("Done!!!");
+      }
+      catch (Throwable e) {
+         e.printStackTrace();
+      }
+   }
+
+   @Test
+   public void testRollback() throws Exception {
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName);
+         System.out.println("Queue:" + queue);
+         MessageProducer producer = session.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         producer.send(session.createTextMessage("test"));
+         producer.send(session.createTextMessage("test2"));
+         connection.start();
+         Assert.assertNull(consumer.receiveNoWait());
+         session.rollback();
+         producer.send(session.createTextMessage("test2"));
+         Assert.assertNull(consumer.receiveNoWait());
+         session.commit();
+         TextMessage msg = (TextMessage) consumer.receive(1000);
+
+         Assert.assertNotNull(msg);
+         Assert.assertEquals("test2", msg.getText());
+      }
+   }
+
+   @Test
+   public void testAutoAck() throws Exception {
+      Connection connection = factory.createConnection();
+
+      Collection<Session> sessions = new LinkedList<>();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      System.out.println("Queue:" + queue);
+      MessageProducer producer = session.createProducer(queue);
+      MessageConsumer consumer = session.createConsumer(queue);
+      TextMessage msg = session.createTextMessage("test");
+      msg.setStringProperty("abc", "testAutoACK");
+      producer.send(msg);
+
+      Assert.assertNull(consumer.receive(100));
+      connection.start();
+
+      TextMessage message = (TextMessage) consumer.receive(5000);
+
+      Assert.assertNotNull(message);
+
+      connection.close();
+
+      System.err.println("Done!!!");
+   }
+
+   @Test
+   public void testProducerFlowControl() throws Exception {
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+
+      factory.setProducerWindowSize(1024 * 64);
+
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("test"));
+
+      connection.close();
+   }
+
+   @Test
    public void testSimpleQueue() throws Exception {
       connection.start();
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -88,12 +242,11 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       session.close();
    }
 
-
-   @Test
+   //   @Test -- ignored for now
    public void testKeepAlive() throws Exception {
       connection.start();
 
-      Thread.sleep(125000);
+      Thread.sleep(30000);
 
       connection.createSession(false, 1);
    }
@@ -237,9 +390,11 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Queue queue = session.createQueue("foo");
 
-      thrown.expect(InvalidDestinationException.class);
-      thrown.expect(JMSException.class);
-      session.createProducer(queue);
+      try {
+         session.createProducer(queue);
+      }
+      catch (JMSException expected) {
+      }
       session.close();
    }
 
@@ -390,7 +545,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    }
 
-
    /**
     * This is the example shipped with the distribution
     *
@@ -473,7 +627,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    }
 
-
    // simple test sending openwire, consuming core
    @Test
    public void testMixedOpenWireExample2() throws Exception {
@@ -513,5 +666,396 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       conn2.close();
    }
 
+   @Test
+   public void testXAConsumer() throws Exception {
+      Queue queue;
+      try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+         queue = session.createQueue(queueName);
+         System.out.println("Queue:" + queue);
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 10; i++) {
+            TextMessage msg = session.createTextMessage("test" + i);
+            msg.setStringProperty("myobj", "test" + i);
+            producer.send(msg);
+         }
+         session.close();
+      }
+
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         Xid xid = newXID();
+
+         XASession session = xaconnection.createXASession();
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         MessageConsumer consumer = session.createConsumer(queue);
+         xaconnection.start();
+         for (int i = 0; i < 5; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("test" + i, message.getText());
+         }
+         session.getXAResource().end(xid, XAResource.TMSUCCESS);
+         session.getXAResource().rollback(xid);
+         consumer.close();
+         xaconnection.close();
+      }
+
+      try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(queue);
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            //            Assert.assertEquals("test" + i, message.getText());
+            System.out.println("Message " + message.getText());
+         }
+         checkDuplicate(consumer);
+         System.out.println("Queue:" + queue);
+         session.close();
+      }
+
+      System.err.println("Done!!!");
+   }
+
+   @Test
+   public void testXASameConsumerRollback() throws Exception {
+      Queue queue;
+      try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+         queue = session.createQueue(queueName);
+         System.out.println("Queue:" + queue);
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 10; i++) {
+            TextMessage msg = session.createTextMessage("test" + i);
+            msg.setStringProperty("myobj", "test" + i);
+            producer.send(msg);
+         }
+         session.close();
+      }
+
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         Xid xid = newXID();
+
+         XASession session = xaconnection.createXASession();
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         MessageConsumer consumer = session.createConsumer(queue);
+         xaconnection.start();
+         for (int i = 0; i < 5; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("test" + i, message.getText());
+         }
+         session.getXAResource().end(xid, XAResource.TMSUCCESS);
+         session.getXAResource().rollback(xid);
+
+         xid = newXID();
+         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
+
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("test" + i, message.getText());
+         }
+
+         checkDuplicate(consumer);
+
+         session.getXAResource().end(xid, XAResource.TMSUCCESS);
+         session.getXAResource().commit(xid, true);
+      }
+   }
+
+   @Test
+   public void testXAPrepare() throws Exception {
+      try {
+
+         XAConnection connection = xaFactory.createXAConnection();
+
+         XASession xasession = connection.createXASession();
+
+         Xid xid = newXID();
+         xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         Queue queue = xasession.createQueue(queueName);
+         MessageProducer producer = xasession.createProducer(queue);
+         producer.send(xasession.createTextMessage("hello"));
+         producer.send(xasession.createTextMessage("hello"));
+         xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
+
+         xasession.getXAResource().prepare(xid);
+
+         connection.close();
+
+         System.err.println("Done!!!");
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+      }
+   }
+
+   @Test
+   public void testAutoSend() throws Exception {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         producer.send(session.createTextMessage("testXX" + i));
+      }
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+   }
+
+   @Test
+   public void testCommitCloseConsumerBefore() throws Exception {
+      testCommitCloseConsumer(true);
+   }
+
+   @Test
+   public void testCommitCloseConsumerAfter() throws Exception {
+      testCommitCloseConsumer(false);
+   }
+
+   private void testCommitCloseConsumer(boolean closeBefore) throws Exception {
+      connection.start();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage msg = session.createTextMessage("testXX" + i);
+         msg.setStringProperty("count", "str " + i);
+         producer.send(msg);
+      }
+      session.commit();
+      connection.start();
+
+      for (int i = 0; i < 5; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+      if (closeBefore) {
+         consumer.close();
+      }
+
+      session.commit();
+
+      // we're testing two scenarios.
+      // closing the consumer before commit or after commit
+      if (!closeBefore) {
+         consumer.close();
+      }
+
+      consumer = session.createConsumer(queue);
+      //      Assert.assertNull(consumer.receiveNoWait());
+      for (int i = 5; i < 10; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      Assert.assertNull(consumer.receiveNoWait());
+
+   }
+
+   @Test
+   public void testRollbackWithAcked() throws Exception {
+      connection.start();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage msg = session.createTextMessage("testXX" + i);
+         msg.setStringProperty("count", "str " + i);
+         producer.send(msg);
+      }
+      session.commit();
+      connection.start();
+
+      for (int i = 0; i < 5; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      session.rollback();
+
+      consumer.close();
+
+      consumer = session.createConsumer(queue);
+      //      Assert.assertNull(consumer.receiveNoWait());
+      for (int i = 0; i < 10; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         //         System.out.println("TXT::" + txt);
+         Assert.assertNotNull(txt);
+         System.out.println("TXT " + txt.getText());
+         //         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+      session.commit();
+
+      checkDuplicate(consumer);
+
+   }
+
+   @Test
+   public void testRollbackLocal() throws Exception {
+      connection.start();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage msg = session.createTextMessage("testXX" + i);
+         msg.setStringProperty("count", "str " + i);
+         producer.send(msg);
+      }
+      session.commit();
+      connection.start();
+
+      for (int i = 0; i < 5; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(500);
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      session.rollback();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(txt);
+         System.out.println("TXT " + txt.getText());
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      checkDuplicate(consumer);
+
+      session.commit();
+
+   }
+
+   private void checkDuplicate(MessageConsumer consumer) throws JMSException {
+      boolean duplicatedMessages = false;
+      while (true) {
+         TextMessage txt = (TextMessage) consumer.receiveNoWait();
+         if (txt == null) {
+            break;
+         }
+         else {
+            duplicatedMessages = true;
+            System.out.println("received in duplicate:" + txt.getText());
+         }
+      }
+
+      Assert.assertFalse("received messages in duplicate", duplicatedMessages);
+   }
+
+   @Test
+   public void testIndividualAck() throws Exception {
+      connection.start();
+      Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage msg = session.createTextMessage("testXX" + i);
+         msg.setStringProperty("count", "str " + i);
+         producer.send(msg);
+      }
+      connection.start();
+
+      for (int i = 0; i < 5; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         if (i == 4) {
+            txt.acknowledge();
+         }
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      consumer.close();
+
+      consumer = session.createConsumer(queue);
+      //      Assert.assertNull(consumer.receiveNoWait());
+      for (int i = 0; i < 4; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         txt.acknowledge();
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      for (int i = 5; i < 10; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(5000);
+         txt.acknowledge();
+         Assert.assertEquals("testXX" + i, txt.getText());
+      }
+
+      checkDuplicate(consumer);
+
+      Assert.assertNull(consumer.receiveNoWait());
+
+   }
+
+   @Test
+   public void testCommitCloseConsumeXA() throws Exception {
+
+      Queue queue;
+      {
+         connection.start();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+         queue = session.createQueue(queueName);
+
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 10; i++) {
+            TextMessage msg = session.createTextMessage("testXX" + i);
+            msg.setStringProperty("count", "str " + i);
+            producer.send(msg);
+         }
+         session.commit();
+      }
+
+      try (XAConnection xaconnection = xaFactory.createXAConnection()) {
+         xaconnection.start();
+
+         XASession xasession = xaconnection.createXASession();
+         Xid xid = newXID();
+         xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
+         MessageConsumer consumer = xasession.createConsumer(queue);
+
+         for (int i = 0; i < 5; i++) {
+            TextMessage txt = (TextMessage) consumer.receive(5000);
+            Assert.assertEquals("testXX" + i, txt.getText());
+         }
+
+         consumer.close();
+
+         xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
+         xasession.getXAResource().prepare(xid);
+         xasession.getXAResource().commit(xid, false);
+
+         xaconnection.close();
+      }
+
+      {
+         connection.start();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         try (MessageConsumer consumer = session.createConsumer(queue)) {
+            for (int i = 5; i < 10; i++) {
+               TextMessage txt = (TextMessage) consumer.receive(5000);
+               Assert.assertEquals("testXX" + i, txt.getText());
+            }
+         }
+
+      }
+
+   }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
deleted file mode 100644
index da4ecb3..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.integration.openwire.investigations;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class InvestigationOpenwireTest extends BasicOpenWireTest {
-
-   @Test
-   public void testSimple() throws Exception {
-      try {
-
-         Connection connection = factory.createConnection();
-         //      Thread.sleep(5000);
-
-         Collection<Session> sessions = new LinkedList<>();
-
-         for (int i = 0; i < 10; i++) {
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            sessions.add(session);
-         }
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   @Test
-   public void testProducerFlowControl() throws Exception {
-      try {
-
-         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
-
-         factory.setProducerWindowSize(1024 * 64);
-
-         Connection connection = factory.createConnection();
-         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-         Queue queue = session.createQueue(queueName);
-         MessageProducer producer = session.createProducer(queue);
-         producer.send(session.createTextMessage("test"));
-
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   @Test
-   public void testAutoAck() throws Exception {
-      try {
-
-         Connection connection = factory.createConnection();
-         //      Thread.sleep(5000);
-
-         Collection<Session> sessions = new LinkedList<>();
-
-         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Queue queue = session.createQueue(queueName);
-         System.out.println("Queue:" + queue);
-         MessageProducer producer = session.createProducer(queue);
-         MessageConsumer consumer = session.createConsumer(queue);
-         producer.send(session.createTextMessage("test"));
-
-         Assert.assertNull(consumer.receive(100));
-         connection.start();
-
-         TextMessage message = (TextMessage)consumer.receive(5000);
-
-         Assert.assertNotNull(message);
-
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Throwable e) {
-         e.printStackTrace();
-      }
-   }
-
-
-
-   @Test
-   public void testRollback() throws Exception {
-      try {
-
-         Connection connection = factory.createConnection();
-         //      Thread.sleep(5000);
-
-         Collection<Session> sessions = new LinkedList<>();
-
-         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-         Queue queue = session.createQueue(queueName);
-         System.out.println("Queue:" + queue);
-         MessageProducer producer = session.createProducer(queue);
-         MessageConsumer consumer = session.createConsumer(queue);
-         producer.send(session.createTextMessage("test"));
-         producer.send(session.createTextMessage("test2"));
-         connection.start();
-         Assert.assertNull(consumer.receive(1000));
-         session.rollback();
-         producer.send(session.createTextMessage("test2"));
-         Assert.assertNull(consumer.receive(1000));
-         session.commit();
-         TextMessage msg = (TextMessage)consumer.receive(1000);
-
-
-         Assert.assertNotNull(msg);
-         Assert.assertEquals("test2", msg.getText());
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Throwable e) {
-         e.printStackTrace();
-      }
-   }
-
-
-
-   @Test
-   public void testClientACK() throws Exception {
-      try {
-
-         Connection connection = factory.createConnection();
-         //      Thread.sleep(5000);
-
-         Collection<Session> sessions = new LinkedList<>();
-
-         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         Queue queue = session.createQueue(queueName);
-         System.out.println("Queue:" + queue);
-         MessageProducer producer = session.createProducer(queue);
-         MessageConsumer consumer = session.createConsumer(queue);
-         producer.send(session.createTextMessage("test"));
-
-         Assert.assertNull(consumer.receive(100));
-         connection.start();
-
-         TextMessage message = (TextMessage)consumer.receive(5000);
-
-         Assert.assertNotNull(message);
-
-         message.acknowledge();
-
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Throwable e) {
-         e.printStackTrace();
-      }
-   }
-
-   @Test
-   public void testXASimple() throws Exception {
-      try {
-
-         XAConnection connection = xaFactory.createXAConnection();
-         //      Thread.sleep(5000);
-
-         Collection<Session> sessions = new LinkedList<>();
-
-         for (int i = 0; i < 10; i++) {
-            XASession session = connection.createXASession();
-            session.getXAResource().start(newXID(), XAResource.TMNOFLAGS);
-            sessions.add(session);
-         }
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-
-   @Test
-   public void testXAPrepare() throws Exception {
-      try {
-
-         XAConnection connection = xaFactory.createXAConnection();
-         //      Thread.sleep(5000);
-
-
-         XASession session = connection.createXASession();
-
-         Xid xid = newXID();
-         session.getXAResource().start(xid, XAResource.TMNOFLAGS);
-         Queue queue = session.createQueue(queueName);
-         MessageProducer producer = session.createProducer(queue);
-         producer.send(session.createTextMessage("hello"));
-         session.getXAResource().end(xid, XAResource.TMSUCCESS);
-         session.getXAResource().prepare(xid);
-
-         connection.close();
-
-         System.err.println("Done!!!");
-      }
-      catch (Exception e) {
-         e.printStackTrace();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index a644718..805a6f5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -105,6 +105,16 @@ public class BindingsImplTest extends ActiveMQTestBase {
    private final class FakeTransaction implements Transaction {
 
       @Override
+      public Object getProtocolData() {
+         return null;
+      }
+
+      @Override
+      public void setProtocolData(Object data) {
+
+      }
+
+      @Override
       public void addOperation(final TransactionOperation sync) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fb445681/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 99d01e6..78659d2 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -42,6 +42,11 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+
+   }
+
+   @Override
    public void deleteQueue(boolean removeConsumers) throws Exception {
    }