You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/05/17 13:46:21 UTC

[iotdb] branch DoubleWrite updated: format

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DoubleWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/DoubleWrite by this push:
     new e0c92ad  format
e0c92ad is described below

commit e0c92adc7d953e10e652d8729e75b4b9683625df
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Mon May 17 21:45:31 2021 +0800

    format
---
 .../iotdb/db/doublewrite/DoubleWriteConsumer.java  | 28 +++++++---------------
 .../iotdb/db/doublewrite/DoubleWriteProducer.java  |  2 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  3 ---
 3 files changed, 10 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
index 50ce369..75551d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
@@ -30,12 +30,10 @@ import org.apache.thrift.transport.TTransport;
 import java.util.concurrent.BlockingQueue;
 
 public class DoubleWriteConsumer implements Runnable {
-  private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
-  private TSIService.Iface doubleWriteClient;
-  private TTransport transport;
-  private long sessionId;
-  private long consumerCnt = 0;
-  private long consumerTime = 0;
+  private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
+  private final TSIService.Iface doubleWriteClient;
+  private final TTransport transport;
+  private final long sessionId;
 
   public DoubleWriteConsumer(
       BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue,
@@ -52,20 +50,16 @@ public class DoubleWriteConsumer implements Runnable {
   public void run() {
     try {
       while (true) {
-        long startTime = System.nanoTime();
         Pair<DoubleWriteType, TSInsertRecordsReq> head = doubleWriteQueue.take();
         if (head.left == DoubleWriteType.DOUBLE_WRITE_END) {
           break;
         }
-        switch (head.left) {
-          case TSInsertRecordsReq:
-            TSInsertRecordsReq tsInsertRecordsReq = head.right;
-            doubleWriteClient.insertRecords(tsInsertRecordsReq);
-            break;
+        if (head.left == DoubleWriteType.TSInsertRecordsReq) {
+          TSInsertRecordsReq tsInsertRecordsReq = head.right;
+          doubleWriteClient.insertRecords(tsInsertRecordsReq);
+        } else {
+          throw new UnsupportedOperationException(String.valueOf(head.left));
         }
-        consumerCnt += 1;
-        long endTime = System.nanoTime();
-        consumerTime += endTime - startTime;
       }
 
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
@@ -83,8 +77,4 @@ public class DoubleWriteConsumer implements Runnable {
       e.printStackTrace();
     }
   }
-
-  public double getEfficiency() {
-    return (double) consumerCnt / (double) consumerTime * 1000000000.0;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
index 6c44398..0c4e31a 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import java.util.concurrent.BlockingQueue;
 
 public class DoubleWriteProducer {
-  private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
+  private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
 
   public DoubleWriteProducer(
       BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 79c5f18..37c4171 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -195,7 +195,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
   private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
   private final Map<Long, DoubleWriteProducer> sessionIdProducerMap = new ConcurrentHashMap<>();
-  private final Map<Long, DoubleWriteConsumer> sessionIdConsumerMap = new ConcurrentHashMap<>();
 
   // The sessionId is unique in one IoTDB instance.
   private final AtomicLong sessionIdGenerator = new AtomicLong();
@@ -294,7 +293,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         DoubleWriteConsumer doubleWriteConsumer = getDoubleWriteConsumer(doubleWriteQueue);
         new Thread(doubleWriteConsumer).start();
         sessionIdProducerMap.put(sessionId, doubleWriteProducer);
-        sessionIdConsumerMap.put(sessionId, doubleWriteConsumer);
       }
     } else {
       tsStatus =
@@ -374,7 +372,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) {
       sessionIdProducerMap.get(sessionId).put(new Pair<>(DoubleWriteType.DOUBLE_WRITE_END, null));
       sessionIdProducerMap.remove(sessionId);
-      sessionIdConsumerMap.remove(sessionId);
     }
 
     return new TSStatus(