You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/05/17 13:46:21 UTC
[iotdb] branch DoubleWrite updated: format
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DoubleWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/DoubleWrite by this push:
new e0c92ad format
e0c92ad is described below
commit e0c92adc7d953e10e652d8729e75b4b9683625df
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon May 17 21:45:31 2021 +0800
format
---
.../iotdb/db/doublewrite/DoubleWriteConsumer.java | 28 +++++++---------------
.../iotdb/db/doublewrite/DoubleWriteProducer.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 3 ---
3 files changed, 10 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
index 50ce369..75551d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
@@ -30,12 +30,10 @@ import org.apache.thrift.transport.TTransport;
import java.util.concurrent.BlockingQueue;
public class DoubleWriteConsumer implements Runnable {
- private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
- private TSIService.Iface doubleWriteClient;
- private TTransport transport;
- private long sessionId;
- private long consumerCnt = 0;
- private long consumerTime = 0;
+ private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
+ private final TSIService.Iface doubleWriteClient;
+ private final TTransport transport;
+ private final long sessionId;
public DoubleWriteConsumer(
BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue,
@@ -52,20 +50,16 @@ public class DoubleWriteConsumer implements Runnable {
public void run() {
try {
while (true) {
- long startTime = System.nanoTime();
Pair<DoubleWriteType, TSInsertRecordsReq> head = doubleWriteQueue.take();
if (head.left == DoubleWriteType.DOUBLE_WRITE_END) {
break;
}
- switch (head.left) {
- case TSInsertRecordsReq:
- TSInsertRecordsReq tsInsertRecordsReq = head.right;
- doubleWriteClient.insertRecords(tsInsertRecordsReq);
- break;
+ if (head.left == DoubleWriteType.TSInsertRecordsReq) {
+ TSInsertRecordsReq tsInsertRecordsReq = head.right;
+ doubleWriteClient.insertRecords(tsInsertRecordsReq);
+ } else {
+ throw new UnsupportedOperationException(String.valueOf(head.left));
}
- consumerCnt += 1;
- long endTime = System.nanoTime();
- consumerTime += endTime - startTime;
}
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
@@ -83,8 +77,4 @@ public class DoubleWriteConsumer implements Runnable {
e.printStackTrace();
}
}
-
- public double getEfficiency() {
- return (double) consumerCnt / (double) consumerTime * 1000000000.0;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
index 6c44398..0c4e31a 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.util.concurrent.BlockingQueue;
public class DoubleWriteProducer {
- private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
+ private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
public DoubleWriteProducer(
BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 79c5f18..37c4171 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -195,7 +195,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
private final Map<Long, DoubleWriteProducer> sessionIdProducerMap = new ConcurrentHashMap<>();
- private final Map<Long, DoubleWriteConsumer> sessionIdConsumerMap = new ConcurrentHashMap<>();
// The sessionId is unique in one IoTDB instance.
private final AtomicLong sessionIdGenerator = new AtomicLong();
@@ -294,7 +293,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
DoubleWriteConsumer doubleWriteConsumer = getDoubleWriteConsumer(doubleWriteQueue);
new Thread(doubleWriteConsumer).start();
sessionIdProducerMap.put(sessionId, doubleWriteProducer);
- sessionIdConsumerMap.put(sessionId, doubleWriteConsumer);
}
} else {
tsStatus =
@@ -374,7 +372,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) {
sessionIdProducerMap.get(sessionId).put(new Pair<>(DoubleWriteType.DOUBLE_WRITE_END, null));
sessionIdProducerMap.remove(sessionId);
- sessionIdConsumerMap.remove(sessionId);
}
return new TSStatus(