You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:31:47 UTC

[65/69] [abbrv] activemq-artemis git commit: Some tweaks to the code

Some tweaks to the code


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/44a6622b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/44a6622b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/44a6622b

Branch: refs/heads/refactor-openwire
Commit: 44a6622b8a83057fb84fd4123147855751783387
Parents: 8f3db59
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 18 14:30:52 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 30 22:29:44 2016 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  17 +--
 .../openwire/OpenWireMessageConverter.java      |   2 +-
 .../amq/AMQCompositeConsumerBrokerExchange.java |   9 +-
 .../core/protocol/openwire/amq/AMQConsumer.java | 144 ++++++++++---------
 .../core/protocol/openwire/amq/AMQSession.java  |  45 +++---
 5 files changed, 125 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 17f26b0..e8259c3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -426,16 +426,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
-   public void addConsumerBrokerExchange(ConsumerId id,
+   private void addConsumerBrokerExchange(ConsumerId id,
                                          AMQSession amqSession,
-                                         Map<ActiveMQDestination, AMQConsumer> consumerMap) {
+                                         List<AMQConsumer> consumerList) {
       AMQConsumerBrokerExchange result = consumerExchanges.get(id);
       if (result == null) {
-         if (consumerMap.size() == 1) {
-            result = new AMQSingleConsumerBrokerExchange(amqSession, consumerMap.values().iterator().next());
+         if (consumerList.size() == 1) {
+            result = new AMQSingleConsumerBrokerExchange(amqSession, consumerList.get(0));
          }
          else {
-            result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerMap);
+            result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerList);
          }
          synchronized (consumerExchanges) {
             consumerExchanges.put(id, result);
@@ -717,9 +717,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             throw new IllegalStateException("Session not exist! : " + sessionId);
          }
 
-         amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+         List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
 
+         this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
          ss.addConsumer(info);
+         amqSession.start();
       }
    }
 
@@ -729,7 +731,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public void onSlowConsumer(ServerConsumer consumer) {
          if (consumer instanceof AMQServerConsumer) {
             AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
-            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getDestination());
+            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination());
             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
             try {
                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
@@ -1002,7 +1004,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-         new Exception("commit").printStackTrace();
          try {
             protocolManager.commitTransactionOnePhase(info);
             TransactionId txId = info.getTransactionId();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 6176490..89f71ed 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -443,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    public static MessageDispatch createMessageDispatch(ServerMessage message,
                                                        int deliveryCount,
                                                        AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
+      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
index 7e83767..56b4b6d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -20,15 +20,20 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessagePull;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange {
 
    private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
 
-   public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap) {
+   public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, List<AMQConsumer> consumerList) {
       super(amqSession);
-      this.consumerMap = consumerMap;
+      this.consumerMap = new HashMap<>();
+      for (AMQConsumer consumer : consumerList) {
+         consumerMap.put(consumer.getOpenwireDestination(), consumer);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index d296213..b4056fb 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -49,11 +48,10 @@ import org.apache.activemq.wireformat.WireFormat;
 public class AMQConsumer {
 
    private AMQSession session;
-   private org.apache.activemq.command.ActiveMQDestination actualDest;
+   private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private long nativeId = -1;
-   private SimpleString subQueueName = null;
 
    private int prefetchSize;
    private AtomicInteger windowAvailable;
@@ -66,7 +64,7 @@ public class AMQConsumer {
                       ConsumerInfo info,
                       ScheduledExecutorService scheduledPool) {
       this.session = amqSession;
-      this.actualDest = d;
+      this.openwireDestination = d;
       this.info = info;
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
@@ -76,73 +74,38 @@ public class AMQConsumer {
       }
    }
 
-   public void init(SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
-      AMQServerSession coreSession = session.getCoreSession();
-
-      SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
-
-      nativeId = session.getCoreServer().getStorageManager().generateID();
-
-      SimpleString address = new SimpleString(this.actualDest.getPhysicalName());
-
-      if (this.actualDest.isTopic()) {
-         String physicalName = this.actualDest.getPhysicalName();
-         if (physicalName.contains(".>")) {
-            //wildcard
-            physicalName = OpenWireUtil.convertWildcard(physicalName);
-         }
-
-         // on recreate we don't need to create queues
-         address = new SimpleString("jms.topic." + physicalName);
-         if (info.isDurable()) {
-            subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, info.getClientId(), info.getSubscriptionName()));
-
-            QueueQueryResult result = coreSession.executeQueueQuery(subQueueName);
-            if (result.isExists()) {
-               // Already exists
-               if (result.getConsumerCount() > 0) {
-                  throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
-               }
+   public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
+      this.nativeId = nativeId;
+      AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
+      serverConsumer.setAmqConsumer(this);
+   }
 
-               SimpleString oldFilterString = result.getFilterString();
 
-               boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
+   private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
 
-               SimpleString oldTopicName = result.getAddress();
+      SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
 
-               boolean topicChanged = !oldTopicName.equals(address);
+      String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
 
-               if (selectorChanged || topicChanged) {
-                  // Delete the old durable sub
-                  coreSession.deleteQueue(subQueueName);
+      SimpleString address;
 
-                  // Create the new one
-                  coreSession.createQueue(address, subQueueName, selector, false, true);
-               }
+      if (openwireDestination.isTopic()) {
+         address = new SimpleString("jms.topic." + physicalName);
 
-            }
-            else {
-               coreSession.createQueue(address, subQueueName, selector, false, true);
-            }
-         }
-         else {
-            subQueueName = new SimpleString(UUID.randomUUID().toString());
+         SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
 
-            coreSession.createQueue(address, subQueueName, selector, true, false);
-         }
-
-         AMQServerConsumer serverConsumer = (AMQServerConsumer) coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
+         AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
-         serverConsumer.setAmqConsumer(this);
+         return serverConsumer;
       }
       else {
-         SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
-         AMQServerConsumer serverConsumer = (AMQServerConsumer)coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
-         serverConsumer.setAmqConsumer(this);
+         SimpleString queueName = new SimpleString("jms.queue." + physicalName);
+         AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+         serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
          if (addrSettings != null) {
             //see PolicyEntry
-            if (prefetchSize != 0 && addrSettings.getQueuePrefetch() == 0) {
+            if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
                //sends back a ConsumerControl
                ConsumerControl cc = new ConsumerControl();
                cc.setConsumerId(info.getConsumerId());
@@ -150,9 +113,63 @@ public class AMQConsumer {
                session.getConnection().dispatch(cc);
             }
          }
+
+         return serverConsumer;
+
+      }
+
+   }
+
+   private SimpleString createTopicSubscription(boolean isDurable,
+                                                String clientID,
+                                                String physicalName,
+                                                String subscriptionName,
+                                                SimpleString selector,
+                                                SimpleString address) throws Exception {
+
+      SimpleString queueName;
+
+      if (isDurable) {
+         queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
+         QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
+         if (result.isExists()) {
+            // Already exists
+            if (result.getConsumerCount() > 0) {
+               throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+            }
+
+            SimpleString oldFilterString = result.getFilterString();
+
+            boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
+
+            SimpleString oldTopicName = result.getAddress();
+
+            boolean topicChanged = !oldTopicName.equals(address);
+
+            if (selectorChanged || topicChanged) {
+               // Delete the old durable sub
+               session.getCoreSession().deleteQueue(queueName);
+
+               // Create the new one
+               session.getCoreSession().createQueue(address, queueName, selector, false, true);
+            }
+         }
+         else {
+            session.getCoreSession().createQueue(address, queueName, selector, false, true);
+         }
+      }
+      else {
+         queueName = new SimpleString(UUID.randomUUID().toString());
+
+         session.getCoreSession().createQueue(address, queueName, selector, true, false);
+
       }
+
+      return queueName;
    }
 
+
+
    public long getNativeId() {
       return this.nativeId;
    }
@@ -200,7 +217,7 @@ public class AMQConsumer {
    public void handleDeliverNullDispatch() {
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(getId());
-      md.setDestination(actualDest);
+      md.setDestination(openwireDestination);
       session.deliverMessage(md);
       windowAvailable.decrementAndGet();
    }
@@ -351,10 +368,6 @@ public class AMQConsumer {
       }
    }
 
-   public org.apache.activemq.command.ActiveMQDestination getDestination() {
-      return actualDest;
-   }
-
    public ConsumerInfo getInfo() {
       return info;
    }
@@ -375,8 +388,8 @@ public class AMQConsumer {
       session.removeConsumer(nativeId);
    }
 
-   public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
-      return actualDest;
+   public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
+      return openwireDestination;
    }
 
    public void setPrefetchSize(int prefetchSize) {
@@ -388,6 +401,9 @@ public class AMQConsumer {
       }
    }
 
+   /**
+    * The MessagePullHandler is used with slow consumer policies.
+    * */
    private class MessagePullHandler {
 
       private long next = -1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44a6622b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 86ea582..4675dca 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import javax.jms.ResourceAllocationException;
 import javax.transaction.xa.Xid;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +44,8 @@ import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
@@ -62,6 +63,10 @@ import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
+
+   // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
+   protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+
    private ConnectionInfo connInfo;
    private AMQServerSession coreSession;
    private SessionInfo sessInfo;
@@ -98,7 +103,7 @@ public class AMQSession implements SessionCallback {
       this.connection = connection;
       this.scheduledPool = scheduledPool;
       this.manager = manager;
-      OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller();
+      OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
 
       this.converter = new OpenWireMessageConverter(marshaller.copy());
    }
@@ -130,7 +135,9 @@ public class AMQSession implements SessionCallback {
 
    }
 
-   public void createConsumer(ConsumerInfo info, AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
+   public List<AMQConsumer> createConsumer(ConsumerInfo info,
+                              AMQSession amqSession,
+                              SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
       //check destination
       ActiveMQDestination dest = info.getDestination();
       ActiveMQDestination[] dests = null;
@@ -140,25 +147,32 @@ public class AMQSession implements SessionCallback {
       else {
          dests = new ActiveMQDestination[]{dest};
       }
-      Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
-      for (ActiveMQDestination d : dests) {
-         if (d.isQueue()) {
-            SimpleString queueName = OpenWireUtil.toCoreAddress(d);
+//      Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
+      List<AMQConsumer> consumersList = new java.util.LinkedList<>();
+
+      for (ActiveMQDestination openWireDest : dests) {
+         if (openWireDest.isQueue()) {
+            SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
             getCoreServer().getJMSQueueCreator().create(queueName);
          }
-         AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
+         AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
 
-         consumer.init(slowConsumerDetectionListener);
-         consumerMap.put(d, consumer);
+         consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
+         consumersList.add(consumer);
          consumers.put(consumer.getNativeId(), consumer);
       }
-      connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
 
-      // TODO: This is wrong. We should only start when the client starts
+      return consumersList;
+   }
+
+   public void start() {
+
       coreSession.start();
       started.set(true);
+
    }
 
+   // rename actualDest to destination
    @Override
    public void afterDelivery() throws Exception {
 
@@ -166,7 +180,7 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public void browserFinished(ServerConsumer consumer) {
-      AMQConsumer theConsumer = ((AMQServerConsumer)consumer).getAmqConsumer();
+      AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
       if (theConsumer != null) {
          theConsumer.browseFinished();
       }
@@ -235,7 +249,6 @@ public class AMQSession implements SessionCallback {
 
    }
 
-
    public void send(final ProducerInfo producerInfo,
                     final Message messageSend,
                     boolean sendProducerAck) throws Exception {
@@ -286,7 +299,7 @@ public class AMQSession implements SessionCallback {
       else {
          final Connection transportConnection = connection.getTransportConnection();
 
-//         new Exception("Setting to false").printStackTrace();
+         //         new Exception("Setting to false").printStackTrace();
 
          if (transportConnection == null) {
             // I don't think this could happen, but just in case, avoiding races
@@ -301,7 +314,6 @@ public class AMQSession implements SessionCallback {
          }
       }
 
-
       internalSend(actualDestinations, originalCoreMsg, runnable);
    }
 
@@ -340,7 +352,6 @@ public class AMQSession implements SessionCallback {
          }
       }
 
-
       for (int i = 0; i < actualDestinations.length; i++) {
 
          ServerMessage coreMsg = originalCoreMsg.copy();