You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/24 21:46:15 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1069 Fix some thread unsafe usages of proton

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 52a2c5dd0 -> f2e0891b0


ARTEMIS-1069 Fix some thread unsafe usages of proton

Unsafe modification of proton resources outside the connection lock
could lead to corruption in the transport work list and other internal
state data.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/40b9ac0a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/40b9ac0a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/40b9ac0a

Branch: refs/heads/master
Commit: 40b9ac0a20390731314bb59fb755b9e435167bcd
Parents: 52a2c5d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 24 17:22:14 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 24 17:46:00 2017 -0400

----------------------------------------------------------------------
 .../amqp/proton/AMQPConnectionContext.java      | 27 ++++++++++++--------
 .../amqp/proton/AMQPSessionContext.java         | 22 +++++++++++-----
 .../proton/ProtonServerReceiverContext.java     | 10 ++++----
 3 files changed, 37 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/40b9ac0a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index d6cab99..25c4b56 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
+
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -50,12 +56,6 @@ import org.jboss.logging.Logger;
 
 import io.netty.buffer.ByteBuf;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
-
 public class AMQPConnectionContext extends ProtonInitializable {
 
    private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@@ -210,7 +210,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
       } else {
          Sender sender = (Sender) link;
          protonSession.addSender(sender);
-         sender.offer(1);
       }
    }
 
@@ -421,8 +420,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
       @Override
       public void onRemoteClose(Link link) throws Exception {
-         link.close();
-         link.free();
+         synchronized (getLock()) {
+            link.close();
+            link.free();
+         }
          ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
          if (linkContext != null) {
             linkContext.close(true);
@@ -431,8 +432,12 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
       @Override
       public void onRemoteDetach(Link link) throws Exception {
-         link.detach();
-         link.free();
+         synchronized (getLock()) {
+            link.detach();
+            link.free();
+         }
+
+         flush();
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/40b9ac0a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index ccc4a6c..64b2531 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -147,8 +147,10 @@ public class AMQPSessionContext extends ProtonInitializable {
       coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
 
       receiver.setContext(transactionHandler);
-      receiver.open();
-      receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
+      synchronized (connection.getLock()) {
+         receiver.open();
+         receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
+      }
    }
 
    public void addSender(Sender sender) throws Exception {
@@ -161,13 +163,17 @@ public class AMQPSessionContext extends ProtonInitializable {
          senders.put(sender, protonSender);
          serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
          sender.setContext(protonSender);
-         sender.open();
+         synchronized (connection.getLock()) {
+            sender.open();
+         }
          protonSender.start();
       } catch (ActiveMQAMQPException e) {
          senders.remove(sender);
          sender.setSource(null);
          sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         sender.close();
+         synchronized (connection.getLock()) {
+            sender.close();
+         }
       }
    }
 
@@ -185,12 +191,16 @@ public class AMQPSessionContext extends ProtonInitializable {
          protonReceiver.initialise();
          receivers.put(receiver, protonReceiver);
          receiver.setContext(protonReceiver);
-         receiver.open();
+         synchronized (connection.getLock()) {
+            receiver.open();
+         }
       } catch (ActiveMQAMQPException e) {
          receivers.remove(receiver);
          receiver.setTarget(null);
          receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         receiver.close();
+         synchronized (connection.getLock()) {
+            receiver.close();
+         }
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/40b9ac0a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 54467cf..34a522f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -52,7 +52,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    /*
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
-     */
+    */
    private static int maxCreditAllocation = 100;
 
    // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
@@ -170,8 +170,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          condition.setCondition(Symbol.valueOf("failed"));
          condition.setDescription(e.getMessage());
          rejected.setError(condition);
-         delivery.disposition(rejected);
-         delivery.settle();
+         synchronized (connection.getLock()) {
+            delivery.disposition(rejected);
+            delivery.settle();
+         }
       }
    }
 
@@ -204,7 +206,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          }
          connection.flush();
       }
-
    }
 
    public void drain(int credits) {
@@ -221,5 +222,4 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    public boolean isDraining() {
       return receiver.draining();
    }
-
 }


[2/2] activemq-artemis git commit: This closes #1132

Posted by cl...@apache.org.
This closes #1132


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f2e0891b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f2e0891b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f2e0891b

Branch: refs/heads/master
Commit: f2e0891b0ee1d4b7083635b9b246e14f707e4e1f
Parents: 52a2c5d 40b9ac0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 24 17:46:01 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 24 17:46:01 2017 -0400

----------------------------------------------------------------------
 .../amqp/proton/AMQPConnectionContext.java      | 27 ++++++++++++--------
 .../amqp/proton/AMQPSessionContext.java         | 22 +++++++++++-----
 .../proton/ProtonServerReceiverContext.java     | 10 ++++----
 3 files changed, 37 insertions(+), 22 deletions(-)
----------------------------------------------------------------------