You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/04/16 07:32:49 UTC
[incubator-iotdb] branch disable_mqtt_server updated: add
AtomicBoolean to double check whether stopped is visable
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch disable_mqtt_server
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/disable_mqtt_server by this push:
new 09f56bd add AtomicBoolean to double check whether stopped is visable
09f56bd is described below
commit 09f56bd3f4326e159ab2738b5d56977f891ec1cc
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Apr 16 15:32:33 2020 +0800
add AtomicBoolean to double check whether stopped is visable
---
.../thrift/server/CustomizedTThreadPoolServer.java | 23 ++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/thrift/server/CustomizedTThreadPoolServer.java b/server/src/main/java/org/apache/thrift/server/CustomizedTThreadPoolServer.java
index 5d6da67..70eb037 100644
--- a/server/src/main/java/org/apache/thrift/server/CustomizedTThreadPoolServer.java
+++ b/server/src/main/java/org/apache/thrift/server/CustomizedTThreadPoolServer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
@@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class CustomizedTThreadPoolServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
- private volatile boolean myStop = true;
+ private volatile AtomicBoolean stoppedCheck = new AtomicBoolean(true);
public static class Args extends AbstractServerArgs<Args> {
public int minWorkerThreads = 5;
@@ -162,9 +163,9 @@ public class CustomizedTThreadPoolServer extends TServer {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
- LOGGER.info("set myStop to false");
- myStop = false;
- super.stopped_ = false;
+ LOGGER.info("set stopped_ to false");
+ stoppedCheck.set(false);
+ stopped_ = false;
setServing(true);
return true;
@@ -183,7 +184,7 @@ public class CustomizedTThreadPoolServer extends TServer {
protected void execute() {
int failureCount = 0;
- while (!this.myStop) {
+ while (!this.stopped_) {
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
@@ -228,9 +229,11 @@ public class CustomizedTThreadPoolServer extends TServer {
}
}
} catch (TTransportException ttx) {
- if (!myStop) {
+ if (!stopped_) {
++failureCount;
- LOGGER.warn("Stopped: {}. Transport error occurred during acceptance of message.", myStop, ttx);
+ LOGGER.warn("Stopped: {}. Transport error occurred during acceptance of message.", stopped_, ttx);
+ stopped_ = stoppedCheck.get();
+ LOGGER.info("update Stopped to {}.", stopped_);
}
}
}
@@ -258,8 +261,8 @@ public class CustomizedTThreadPoolServer extends TServer {
}
public void stop() {
- LOGGER.info("set myStop to true");
- this.myStop = true;
+ LOGGER.info("set stopped_ to true");
+ stoppedCheck.set(true);
super.stopped_ = true;
serverTransport_.interrupt();
}
@@ -312,7 +315,7 @@ public class CustomizedTThreadPoolServer extends TServer {
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
}
- if (myStop) {
+ if (stopped_) {
break;
}
processor.process(inputProtocol, outputProtocol);