You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/01/12 12:37:10 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6560
Repository: activemq
Updated Branches:
refs/heads/master 3a0a7238b -> bdec3f6dd
https://issues.apache.org/jira/browse/AMQ-6560
Converting flags in TransportConnection to AtomicBoolean to reduce
synchronization and improve thread safety
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bdec3f6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bdec3f6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bdec3f6d
Branch: refs/heads/master
Commit: bdec3f6ddb7f1417690f3c89d07ea77f0d6d96e5
Parents: 3a0a723
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Jan 12 07:36:16 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Jan 12 07:36:50 2017 -0500
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 38 ++++++++++----------
1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bdec3f6d/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index ac72534..afc27c3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -137,8 +137,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private boolean blocked;
private boolean connected;
private boolean active;
- private boolean starting;
- private boolean pendingStop;
+ private final AtomicBoolean starting = new AtomicBoolean();
+ private final AtomicBoolean pendingStop = new AtomicBoolean();
private long timeStamp;
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final CountDownLatch stopped = new CountDownLatch(1);
@@ -224,7 +224,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public void serviceTransportException(IOException e) {
- if (!stopping.get() && !pendingStop) {
+ if (!stopping.get() && !pendingStop.get()) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug(this + " failed: " + e, e);
@@ -303,7 +303,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
ConnectionError ce = new ConnectionError();
ce.setException(e);
- if (pendingStop) {
+ if (pendingStop.get()) {
dispatchSync(ce);
} else {
dispatchAsync(ce);
@@ -324,7 +324,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (brokerService.isStopping()) {
response = responseRequired ? new ExceptionResponse(
new BrokerStoppedException("Broker " + brokerService + " is being stopped")) : null;
- } else if (!pendingStop) {
+ } else if (!pendingStop.get()) {
response = command.visit(this);
} else {
response = responseRequired ? new ExceptionResponse(transportException.get()) : null;
@@ -993,7 +993,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
@Override
public boolean iterate() {
try {
- if (pendingStop || stopping.get()) {
+ if (pendingStop.get() || stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
@@ -1051,7 +1051,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void start() throws Exception {
try {
synchronized (this) {
- starting = true;
+ starting.set(true);
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
@@ -1072,7 +1072,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
} catch (Exception e) {
// Force clean up on an error starting up.
- pendingStop = true;
+ pendingStop.set(true);
throw e;
} finally {
// stop() can be called from within the above block,
@@ -1100,7 +1100,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void delayedStop(final int waitTime, final String reason, Throwable cause) {
if (waitTime > 0) {
synchronized (this) {
- pendingStop = true;
+ pendingStop.set(true);
transportException.set(cause);
}
try {
@@ -1129,8 +1129,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
- pendingStop = true;
- if (starting) {
+ pendingStop.set(true);
+ if (starting.get()) {
LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
return;
}
@@ -1341,8 +1341,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/**
* @return true if the Connection is starting
*/
- public synchronized boolean isStarting() {
- return starting;
+ public boolean isStarting() {
+ return starting.get();
}
@Override
@@ -1355,19 +1355,19 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return this.faultTolerantConnection;
}
- protected synchronized void setStarting(boolean starting) {
- this.starting = starting;
+ protected void setStarting(boolean starting) {
+ this.starting.set(starting);
}
/**
* @return true if the Connection needs to stop
*/
- public synchronized boolean isPendingStop() {
- return pendingStop;
+ public boolean isPendingStop() {
+ return pendingStop.get();
}
- protected synchronized void setPendingStop(boolean pendingStop) {
- this.pendingStop = pendingStop;
+ protected void setPendingStop(boolean pendingStop) {
+ this.pendingStop.set(pendingStop);
}
private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {