You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/24 21:46:15 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1069 Fix some thread
unsafe usages of proton
Repository: activemq-artemis
Updated Branches:
refs/heads/master 52a2c5dd0 -> f2e0891b0
ARTEMIS-1069 Fix some thread unsafe usages of proton
Unsafe modification of proton resources outside the connection lock
could lead to corruption in the transport work list and other internal
state data.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/40b9ac0a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/40b9ac0a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/40b9ac0a
Branch: refs/heads/master
Commit: 40b9ac0a20390731314bb59fb755b9e435167bcd
Parents: 52a2c5d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 24 17:22:14 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 24 17:46:00 2017 -0400
----------------------------------------------------------------------
.../amqp/proton/AMQPConnectionContext.java | 27 ++++++++++++--------
.../amqp/proton/AMQPSessionContext.java | 22 +++++++++++-----
.../proton/ProtonServerReceiverContext.java | 10 ++++----
3 files changed, 37 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/40b9ac0a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index d6cab99..25c4b56 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
+
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
@@ -50,12 +56,6 @@ import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
-
public class AMQPConnectionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@@ -210,7 +210,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
} else {
Sender sender = (Sender) link;
protonSession.addSender(sender);
- sender.offer(1);
}
}
@@ -421,8 +420,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override
public void onRemoteClose(Link link) throws Exception {
- link.close();
- link.free();
+ synchronized (getLock()) {
+ link.close();
+ link.free();
+ }
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close(true);
@@ -431,8 +432,12 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override
public void onRemoteDetach(Link link) throws Exception {
- link.detach();
- link.free();
+ synchronized (getLock()) {
+ link.detach();
+ link.free();
+ }
+
+ flush();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/40b9ac0a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index ccc4a6c..64b2531 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -147,8 +147,10 @@ public class AMQPSessionContext extends ProtonInitializable {
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
receiver.setContext(transactionHandler);
- receiver.open();
- receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
+ synchronized (connection.getLock()) {
+ receiver.open();
+ receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
+ }
}
public void addSender(Sender sender) throws Exception {
@@ -161,13 +163,17 @@ public class AMQPSessionContext extends ProtonInitializable {
senders.put(sender, protonSender);
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
sender.setContext(protonSender);
- sender.open();
+ synchronized (connection.getLock()) {
+ sender.open();
+ }
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
- sender.close();
+ synchronized (connection.getLock()) {
+ sender.close();
+ }
}
}
@@ -185,12 +191,16 @@ public class AMQPSessionContext extends ProtonInitializable {
protonReceiver.initialise();
receivers.put(receiver, protonReceiver);
receiver.setContext(protonReceiver);
- receiver.open();
+ synchronized (connection.getLock()) {
+ receiver.open();
+ }
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
- receiver.close();
+ synchronized (connection.getLock()) {
+ receiver.close();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/40b9ac0a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 54467cf..34a522f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -52,7 +52,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
- */
+ */
private static int maxCreditAllocation = 100;
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
@@ -170,8 +170,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(e.getMessage());
rejected.setError(condition);
- delivery.disposition(rejected);
- delivery.settle();
+ synchronized (connection.getLock()) {
+ delivery.disposition(rejected);
+ delivery.settle();
+ }
}
}
@@ -204,7 +206,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
connection.flush();
}
-
}
public void drain(int credits) {
@@ -221,5 +222,4 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
public boolean isDraining() {
return receiver.draining();
}
-
}
[2/2] activemq-artemis git commit: This closes #1132
Posted by cl...@apache.org.
This closes #1132
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f2e0891b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f2e0891b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f2e0891b
Branch: refs/heads/master
Commit: f2e0891b0ee1d4b7083635b9b246e14f707e4e1f
Parents: 52a2c5d 40b9ac0
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 24 17:46:01 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 24 17:46:01 2017 -0400
----------------------------------------------------------------------
.../amqp/proton/AMQPConnectionContext.java | 27 ++++++++++++--------
.../amqp/proton/AMQPSessionContext.java | 22 +++++++++++-----
.../proton/ProtonServerReceiverContext.java | 10 ++++----
3 files changed, 37 insertions(+), 22 deletions(-)
----------------------------------------------------------------------