You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/19 06:09:03 UTC

[57/67] [abbrv] activemq-artemis git commit: producer refactor

producer refactor


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e23c74fd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e23c74fd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e23c74fd

Branch: refs/heads/refactor-openwire
Commit: e23c74fdbb39a3327327906d621e6f59c1b42f34
Parents: 9b165cc
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 1 22:31:39 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Mar 19 01:07:37 2016 -0400

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnection.java    |   4 +
 .../artemis/spi/core/remoting/Connection.java   |   6 +
 .../protocol/openwire/OpenWireConnection.java   | 163 ++++-------
 .../openwire/OpenWireProtocolManager.java       |  15 +-
 .../core/protocol/openwire/SendingResult.java   |  57 ----
 .../core/protocol/openwire/amq/AMQSession.java  | 281 +++++++++----------
 .../openwire/impl/OpenWireServerCallback.java   |  75 -----
 .../artemis/core/paging/PagingStore.java        |   2 +
 .../core/remoting/impl/invm/InVMConnection.java |   6 +
 .../FailoverConsumerOutstandingCommitTest.java  |  25 +-
 .../InvestigationOpenwireTest.java              |  27 ++
 .../storage/PersistMultiThreadTest.java         |   4 +
 12 files changed, 255 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 9268699..3f10227 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -100,6 +100,10 @@ public class NettyConnection implements Connection {
    }
    // Connection implementation ----------------------------
 
+   @Override
+   public void setAutoRead(boolean autoRead) {
+      channel.config().setAutoRead(autoRead);
+   }
 
    @Override
    public synchronized boolean isWritable(ReadyListener callback) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ed10113..4352d49 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -44,6 +44,12 @@ public interface Connection {
    void fireReady(boolean ready);
 
    /**
+    * This will disable reading from the channel.
+    * This is basically the same as blocking the reading.
+    * */
+   void setAutoRead(boolean autoRead);
+
+   /**
     * returns the unique id of this wire.
     *
     * @return the id

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0fd8dc2..1e1e953 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.openwire;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSSecurityException;
-import javax.jms.ResourceAllocationException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -270,22 +269,26 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       catch (Exception e) {
          ActiveMQServerLogger.LOGGER.debug(e);
 
-         Response resp;
-         if (e instanceof ActiveMQSecurityException) {
-            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-         }
-         else if (e instanceof ActiveMQNonExistentQueueException) {
-            resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
-         }
-         else {
-            resp = new ExceptionResponse(e);
-         }
-         try {
-            dispatch(resp);
-         }
-         catch (IOException e2) {
-            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
-         }
+         sendException(e);
+      }
+   }
+
+   public void sendException(Exception e) {
+      Response resp;
+      if (e instanceof ActiveMQSecurityException) {
+         resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+      }
+      else if (e instanceof ActiveMQNonExistentQueueException) {
+         resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
+      }
+      else {
+         resp = new ExceptionResponse(e);
+      }
+      try {
+         dispatch(resp);
+      }
+      catch (IOException e2) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
       }
    }
 
@@ -371,75 +374,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    }
 
-   public void dispatchAsync(Command message) {
-      if (!stopping.get()) {
-         dispatchSync(message);
-      }
-      else {
-         if (message.isMessageDispatch()) {
-            MessageDispatch md = (MessageDispatch) message;
-            TransmitCallback sub = md.getTransmitCallback();
-            protocolManager.postProcessDispatch(md);
-            if (sub != null) {
-               sub.onFailure();
-            }
-         }
-      }
-   }
-
-   public void dispatchSync(Command message) {
-      try {
-         processDispatch(message);
-      }
-      catch (IOException e) {
-         serviceExceptionAsync(e);
-      }
-   }
-
-   public void serviceExceptionAsync(final IOException e) {
-      if (asyncException.compareAndSet(false, true)) {
-         // TODO: Why this is not through an executor?
-         new Thread("Async Exception Handler") {
-            @Override
-            public void run() {
-               serviceException(e);
-            }
-         }.start();
-      }
+   public void dispatchAsync(Command message) throws Exception {
+      dispatchSync(message);
    }
 
-   public void serviceException(Throwable e) {
-      // are we a transport exception such as not being able to dispatch
-      // synchronously to a transport
-      if (e instanceof IOException) {
-         serviceTransportException((IOException) e);
-      }
-      else if (!stopping.get() && !inServiceException) {
-         inServiceException = true;
-         try {
-            ConnectionError ce = new ConnectionError();
-            ce.setException(e);
-            dispatchAsync(ce);
-         }
-         finally {
-            inServiceException = false;
-         }
-      }
+   public void dispatchSync(Command message) throws Exception {
+      processDispatch(message);
    }
 
-   public void serviceTransportException(IOException e) {
-      /*
-       * deal with it later BrokerService bService =
-       * connector.getBrokerService(); if (bService.isShutdownOnSlaveFailure())
-       * { if (brokerInfo != null) { if (brokerInfo.isSlaveBroker()) {
-       * LOG.error("Slave has exception: {} shutting down master now.",
-       * e.getMessage(), e); try { doStop(); bService.stop(); } catch (Exception
-       * ex) { LOG.warn("Failed to stop the master", ex); } } } } if
-       * (!stopping.get() && !pendingStop) { transportException.set(e); if
-       * (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: "
-       * + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
-       * TRANSPORTLOG.warn(this + " failed: " + e); } stopAsync(); }
-       */
+   public void serviceException(Throwable e) throws Exception {
+      ConnectionError ce = new ConnectionError();
+      ce.setException(e);
+      dispatchAsync(ce);
    }
 
    protected void dispatch(Command command) throws IOException {
@@ -570,7 +516,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
-   private void disconnect(ActiveMQException me, String reason, boolean fail) {
+   private void disconnect(ActiveMQException me, String reason, boolean fail)  {
 
       if (context == null || destroyed) {
          return;
@@ -596,8 +542,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       if (command != null && command.isResponseRequired()) {
          Response lastResponse = new Response();
          lastResponse.setCorrelationId(command.getCommandId());
-         dispatchSync(lastResponse);
-         context.setDontSendReponse(true);
+         try {
+            dispatchSync(lastResponse);
+         }
+         catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         }
       }
    }
 
@@ -632,12 +582,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       return this.context;
    }
 
-   public void updateClient(ConnectionControl control) {
-      //      if (!destroyed && context.isFaultTolerant()) {
+   public void updateClient(ConnectionControl control) throws Exception {
       if (protocolManager.isUpdateClusterClients()) {
          dispatchAsync(control);
       }
-      //      }
    }
 
    public AMQConnectionContext initContext(ConnectionInfo info) {
@@ -1063,9 +1011,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-         protocolManager.commitTransactionOnePhase(info);
-         TransactionId txId = info.getTransactionId();
-         txMap.remove(txId);
+         new Exception("commit").printStackTrace();
+         try {
+            protocolManager.commitTransactionOnePhase(info);
+            TransactionId txId = info.getTransactionId();
+            txMap.remove(txId);
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+         }
 
          return null;
       }
@@ -1150,33 +1105,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          AMQSession session = getSession(producerId.getParentId());
 
-         SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
-         if (result.isBlockNextSend()) {
-            if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
-               // TODO see logging
-               throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
-            }
-
-            if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
-               //in that case don't send the response
-               //this will force the client to wait until
-               //the response is got.
-               context.setDontSendReponse(true);
-            }
-            else {
-               //hang the connection until the space is available
-               session.blockingWaitForSpace(producerExchange, result);
-            }
-         }
-         else if (sendProducerAck) {
-            // TODO-now: send through OperationContext
-            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-            OpenWireConnection.this.dispatchAsync(ack);
-         }
-
+         session.send(producerInfo, messageSend, sendProducerAck);
          return null;
       }
 
+
       @Override
       public Response processMessageAck(MessageAck ack) throws Exception {
          AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 51c4bec..7445960 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerE
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
@@ -192,7 +193,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
       for (OpenWireConnection c : this.connections) {
          ConnectionControl control = newConnectionControl();
-         c.updateClient(control);
+         try {
+            c.updateClient(control);
+         }
+         catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+            c.sendException(e);
+         }
       }
    }
 
@@ -365,7 +372,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
          context.setProducerFlowControl(false);
          AMQSession sess = context.getConnection().getAdvisorySession();
          if (sess != null) {
-            sess.send(producerExchange, advisoryMessage, false);
+            sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
          }
       }
       finally {
@@ -515,7 +522,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       server.destroyQueue(subQueueName);
    }
 
-   public void sendBrokerInfo(OpenWireConnection connection) {
+   public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
       BrokerInfo brokerInfo = new BrokerInfo();
       brokerInfo.setBrokerName(server.getIdentity());
       brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
@@ -525,7 +532,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
       //cluster support yet to support
       brokerInfo.setPeerBrokerInfos(null);
-      connection.dispatchAsync(brokerInfo);
+      connection.dispatch(brokerInfo);
    }
 
    public void setRebalanceClusterClients(boolean rebalance) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
deleted file mode 100644
index 0e21ca4..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-
-public class SendingResult {
-
-   private boolean blockNextSend;
-   private PagingStoreImpl blockPagingStore;
-   private SimpleString blockingAddress;
-
-   public void setBlockNextSend(boolean block) {
-      this.blockNextSend = block;
-   }
-
-   public boolean isBlockNextSend() {
-      return this.blockNextSend;
-   }
-
-   public void setBlockPagingStore(PagingStoreImpl store) {
-      this.blockPagingStore = store;
-   }
-
-   public PagingStoreImpl getBlockPagingStore() {
-      return this.blockPagingStore;
-   }
-
-   public void setBlockingAddress(SimpleString address) {
-      this.blockingAddress = address;
-   }
-
-   public SimpleString getBlockingAddress() {
-      return this.blockingAddress;
-   }
-
-   public boolean isSendFailIfNoSpace() {
-      AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy();
-      return policy == AddressFullMessagePolicy.FAIL;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4db5967..b68861e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
+import javax.jms.ResourceAllocationException;
 import javax.transaction.xa.Xid;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -26,41 +26,38 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -106,6 +103,10 @@ public class AMQSession implements SessionCallback {
       this.converter = new OpenWireMessageConverter(marshaller.copy());
    }
 
+   public OpenWireMessageConverter getConverter() {
+      return converter;
+   }
+
    public void initialize() {
       String name = sessInfo.getSessionId().toString();
       String username = connInfo.getUserName();
@@ -226,25 +227,10 @@ public class AMQSession implements SessionCallback {
 
    }
 
-   public AMQServerSession getCoreSession() {
-      return this.coreSession;
-   }
-
-   public ActiveMQServer getCoreServer() {
-      return this.server;
-   }
-
-   public void removeConsumer(long consumerId) throws Exception {
-      boolean failed = !(this.txId != null || this.isTx);
-
-      coreSession.amqCloseConsumer(consumerId, failed);
-      consumers.remove(consumerId);
-   }
 
-   public SendingResult send(AMQProducerBrokerExchange producerExchange,
-                             Message messageSend,
-                             boolean sendProducerAck) throws Exception {
-      SendingResult result = new SendingResult();
+   public void send(final ProducerInfo producerInfo,
+                    final Message messageSend,
+                    boolean sendProducerAck) throws Exception {
       TransactionId tid = messageSend.getTransactionId();
       if (tid != null) {
          resetSessionTx(tid);
@@ -262,39 +248,128 @@ public class AMQSession implements SessionCallback {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      for (ActiveMQDestination dest : actualDestinations) {
+      ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
+
+      /* ActiveMQ failover transport will attempt to reconnect after connection failure.  Any sent messages that did
+      * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last sequence id received to
+      * the client).  To handle this in Artemis we use a duplicate ID cache.  To do this we check to see if the
+      * message comes from failover connection.  If so we add a DUPLICATE_ID to handle duplicates after a resend. */
+      if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
+         originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+      }
+
+      Runnable runnable;
+
+      if (sendProducerAck) {
+         runnable = new Runnable() {
+            public void run() {
+               try {
+                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                  connection.dispatchSync(ack);
+               }
+               catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                  connection.sendException(e);
+               }
+
+            }
+         };
+      }
+      else {
+         final Connection transportConnection = connection.getTransportConnection();
+
+//         new Exception("Setting to false").printStackTrace();
+
+         if (transportConnection == null) {
+            // I don't think this could happen, but just in case, avoiding races
+            runnable = null;
+         }
+         else {
+            runnable = new Runnable() {
+               public void run() {
+                  transportConnection.setAutoRead(true);
+               }
+            };
+         }
+      }
+
 
-         ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend);
+      internalSend(actualDestinations, originalCoreMsg, runnable);
+   }
+
+   private void internalSend(ActiveMQDestination[] actualDestinations,
+                             ServerMessage originalCoreMsg,
+                             final Runnable onComplete) throws Exception {
+
+      Runnable runToUse;
+
+      if (actualDestinations.length <= 1 || onComplete == null) {
+         // if onComplete is null, this will be null ;)
+         runToUse = onComplete;
+      }
+      else {
+         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
+         runToUse = new Runnable() {
+            @Override
+            public void run() {
+               if (count.decrementAndGet() == 0) {
+                  onComplete.run();
+               }
+            }
+         };
+      }
 
-         /* ActiveMQ failover transport will attempt to reconnect after connection failure.  Any sent messages that did
-         * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last sequence id received to
-         * the client).  To handle this in Artemis we use a duplicate ID cache.  To do this we check to see if the
-         * message comes from failover connection.  If so we add a DUPLICATE_ID to handle duplicates after a resend. */
-         if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
-            coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+      SimpleString[] addresses = new SimpleString[actualDestinations.length];
+      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
+
+      // We fillup addresses, pagingStores and we will throw failure if that's the case
+      for (int i = 0; i < actualDestinations.length; i++) {
+         ActiveMQDestination dest = actualDestinations[i];
+         addresses[i] = OpenWireUtil.toCoreAddress(dest);
+         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
+         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
+            throw new ResourceAllocationException("Queue is full");
          }
-         SimpleString address = OpenWireUtil.toCoreAddress(dest);
-         coreMsg.setAddress(address);
+      }
+
+
+      for (int i = 0; i < actualDestinations.length; i++) {
+
+         ServerMessage coreMsg = originalCoreMsg.copy();
 
-         PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
+         coreMsg.setAddress(addresses[i]);
 
+         PagingStore store = pagingStores[i];
 
-         // TODO: Improve this, tested with ProducerFlowControlSendFailTest
          if (store.isFull()) {
-            result.setBlockNextSend(true);
-            result.setBlockPagingStore(store);
-            result.setBlockingAddress(address);
-            //now we hold this message send until the store has space.
-            //we do this by put it in a scheduled task
-            ScheduledExecutorService scheduler = server.getScheduledPool();
-            Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId());
-            scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS);
+            connection.getTransportConnection().setAutoRead(false);
          }
-         else {
-            coreSession.send(coreMsg, false);
+
+         getCoreSession().send(coreMsg, false);
+
+         if (runToUse != null) {
+            // if the timeout is >0, it will wait this much milliseconds
+            // before running the the runToUse
+            // this will eventually unblock blocked destinations
+            // playing flow control
+            store.checkMemory(runToUse);
          }
       }
-      return result;
+   }
+
+   public AMQServerSession getCoreSession() {
+      return this.coreSession;
+   }
+
+   public ActiveMQServer getCoreServer() {
+      return this.server;
+   }
+
+   public void removeConsumer(long consumerId) throws Exception {
+      boolean failed = !(this.txId != null || this.isTx);
+
+      coreSession.amqCloseConsumer(consumerId, failed);
+      consumers.remove(consumerId);
    }
 
    public WireFormat getMarshaller() {
@@ -467,92 +542,4 @@ public class AMQSession implements SessionCallback {
          }
       }
    }
-
-   private class SendRetryTask implements Runnable {
-
-      private ServerMessage coreMsg;
-      private AMQProducerBrokerExchange producerExchange;
-      private boolean sendProducerAck;
-      private int msgSize;
-      private int commandId;
-
-      public SendRetryTask(ServerMessage coreMsg,
-                           AMQProducerBrokerExchange producerExchange,
-                           boolean sendProducerAck,
-                           int msgSize,
-                           int commandId) {
-         this.coreMsg = coreMsg;
-         this.producerExchange = producerExchange;
-         this.sendProducerAck = sendProducerAck;
-         this.msgSize = msgSize;
-         this.commandId = commandId;
-      }
-
-      @Override
-      public void run() {
-         synchronized (AMQSession.this) {
-            try {
-               // check pageStore
-               SimpleString address = coreMsg.getAddress();
-               PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
-               if (store.isFull()) {
-                  // if store is still full, schedule another
-                  server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS);
-               }
-               else {
-                  // now send the message again.
-                  coreSession.send(coreMsg, false);
-
-                  if (sendProducerAck) {
-                     ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-                     ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), msgSize);
-                     connection.dispatchAsync(ack);
-                  }
-                  else {
-                     Response response = new Response();
-                     response.setCorrelationId(commandId);
-                     connection.dispatchAsync(response);
-                  }
-               }
-            }
-            catch (Exception e) {
-               ExceptionResponse response = new ExceptionResponse(e);
-               response.setCorrelationId(commandId);
-               connection.dispatchAsync(response);
-            }
-         }
-
-      }
-   }
-
-   // TODO: remove this, we should do the same as we do on core for blocking
-   public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
-                                    SendingResult result) throws IOException {
-
-
-      new Exception("blocking").printStackTrace();
-      long start = System.currentTimeMillis();
-      long nextWarn = start;
-      producerExchange.blockingOnFlowControl(true);
-
-      AMQConnectionContext context = producerExchange.getConnectionContext();
-      PagingStoreImpl store = result.getBlockPagingStore();
-
-      //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL
-      long blockedProducerWarningInterval = 30000;
-      ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId();
-
-      while (store.isFull()) {
-         if (context.getStopping().get()) {
-            throw new IOException("Connection closed, send aborted.");
-         }
-
-         long now = System.currentTimeMillis();
-         if (now >= nextWarn) {
-            ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), ((now - start) / 1000));
-            nextWarn = now + blockedProducerWarningInterval;
-         }
-      }
-      producerExchange.blockingOnFlowControl(false);
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
deleted file mode 100644
index 8ab3815..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.protocol.openwire.impl;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-
-public class OpenWireServerCallback implements SessionCallback {
-
-   @Override
-   public boolean hasCredits(ServerConsumer consumerID) {
-      return false;
-   }
-
-   @Override
-   public void sendProducerCreditsMessage(int credits, SimpleString address) {
-
-   }
-
-   @Override
-   public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
-
-   }
-
-   @Override
-   public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
-      return 0;
-   }
-
-   @Override
-   public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
-      return 0;
-   }
-
-   @Override
-   public int sendLargeMessageContinuation(ServerConsumer consumerID,
-                                           byte[] body,
-                                           boolean continues,
-                                           boolean requiresResponse) {
-      return 0;
-   }
-
-   @Override
-   public void closed() {
-
-   }
-
-   @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
-
-   }
-
-   @Override
-   public boolean isWritable(ReadyListener callback) {
-      return false;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index e831966..566b91a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -126,6 +126,8 @@ public interface PagingStore extends ActiveMQComponent {
 
    boolean checkMemory(Runnable runnable);
 
+   boolean isFull();
+
    /**
     * Write lock the PagingStore.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 70d6289..db61f89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -138,6 +138,12 @@ public class InVMConnection implements Connection {
    }
 
    @Override
+   public void setAutoRead(boolean autoRead) {
+      // nothing to be done on the INVM.
+      // maybe we could eventually implement something, but not needed now
+   }
+
+   @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
       return ActiveMQBuffers.dynamicBuffer(size);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index 5a160ab..e44a490 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -170,20 +170,21 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
 
    @Test
    @BMRules(
-      rules = {@BMRule(
-         name = "set no return response",
-         targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
-         targetMethod = "processCommitTransactionOnePhase",
-         targetLocation = "ENTRY",
-         binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
-         action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"),
+      rules = {
+         @BMRule(
+            name = "set no return response",
+            targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+            targetMethod = "processCommitTransactionOnePhase",
+            targetLocation = "ENTRY",
+            binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+            action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"),
 
          @BMRule(
-         name = "stop broker before commit",
-         targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
-         targetMethod = "commit",
-         targetLocation = "ENTRY",
-         action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
+            name = "stop broker before commit",
+            targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+            targetMethod = "commit",
+            targetLocation = "ENTRY",
+            action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
    public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
       doTestFailoverConsumerOutstandingSendTx(false);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
index 914a8e1..1599a2c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
@@ -29,6 +29,7 @@ import javax.transaction.xa.XAResource;
 import java.util.Collection;
 import java.util.LinkedList;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -59,6 +60,30 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest {
    }
 
    @Test
+   public void testProducerFlowControl() throws Exception {
+      try {
+
+         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+
+         factory.setProducerWindowSize(1024 * 64);
+
+         Connection connection = factory.createConnection();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("test"));
+
+
+         connection.close();
+
+         System.err.println("Done!!!");
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+      }
+   }
+
+   @Test
    public void testAutoAck() throws Exception {
       try {
 
@@ -108,6 +133,7 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest {
          MessageProducer producer = session.createProducer(queue);
          MessageConsumer consumer = session.createConsumer(queue);
          producer.send(session.createTextMessage("test"));
+         producer.send(session.createTextMessage("test2"));
          connection.start();
          Assert.assertNull(consumer.receive(1000));
          session.rollback();
@@ -130,6 +156,7 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest {
    }
 
 
+
    @Test
    public void testClientACK() throws Exception {
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e23c74fd/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6351357..cd08b9e 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -301,6 +301,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
          return 0;
       }
 
+      public boolean isFull() {
+         return false;
+      }
+
       @Override
       public void applySetting(AddressSettings addressSettings) {