You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/05/18 06:42:41 UTC

[iotdb] branch DoubleWrite created (now f425e7d)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch DoubleWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f425e7d  fix conflicts

This branch includes the following new commits:

     new f425e7d  fix conflicts

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix conflicts

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch DoubleWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f425e7d5c4645d9efa4f48b4191c3436a7adc74c
Merge: b8650ca 99822c7
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue May 18 14:38:09 2021 +0800

    fix conflicts

 .github/workflows/client-go.yml                    |   3 +
 .github/workflows/client.yml                       |   5 +-
 .github/workflows/e2e.yml                          |   3 +
 .github/workflows/main-unix.yml                    |   3 +
 .github/workflows/main-win.yml                     |   3 +
 .github/workflows/sonar-coveralls.yml              |   3 +
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   6 +-
 cli/pom.xml                                        |   2 +-
 .../org/apache/iotdb/tool/AbstractCsvTool.java     |  40 +--
 .../main/java/org/apache/iotdb/tool/ExportCsv.java |   2 +-
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  56 ++--
 client-cpp/src/main/Session.cpp                    |  11 +-
 client-cpp/src/main/Session.h                      |  16 +-
 cluster/pom.xml                                    |  12 +-
 .../resources/conf/iotdb-cluster.properties        |  60 ++--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  20 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |   5 -
 .../iotdb/cluster/log/applier/BaseApplier.java     |  36 ++-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |  48 +--
 .../iotdb/cluster/log/catchup/LogCatchUpTask.java  |   4 +-
 .../cluster/log/manage/CommittedEntryManager.java  |  12 +-
 .../serializable/SyncLogDequeSerializer.java       |   4 +-
 .../query/last/ClusterLastQueryExecutor.java       |   1 +
 .../iotdb/cluster/server/DataClusterServer.java    |   6 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |   6 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  77 +++--
 .../cluster/server/service/BaseAsyncService.java   |  19 +-
 .../cluster/server/service/BaseSyncService.java    |  23 +-
 .../cluster/client/async/AsyncClientPoolTest.java  |  37 ++-
 .../cluster/client/async/AsyncDataClientTest.java  |  18 ++
 .../client/async/AsyncDataHeartbeatClientTest.java |  18 ++
 .../cluster/client/async/AsyncMetaClientTest.java  |  18 ++
 .../client/async/AsyncMetaHeartbeatClientTest.java |  18 ++
 .../iotdb/cluster/log/LogDispatcherTest.java       |  10 +-
 .../cluster/log/applier/DataLogApplierTest.java    |  35 +++
 .../cluster/log/snapshot/DataSnapshotTest.java     |   8 +
 .../cluster/partition/SlotPartitionTableTest.java  |   2 +-
 .../query/ClusterDataQueryExecutorTest.java        |   3 -
 .../cluster/server/member/DataGroupMemberTest.java |   5 +-
 .../cluster/server/member/RaftMemberTest.java      |   9 +-
 code-coverage/pom.xml                              |   2 +-
 compile-tools/pom.xml                              |   8 +-
 .../Administration-Management/Administration.md    |   7 +-
 docs/UserGuide/Cluster/Cluster-Setup-Example.md    |  47 +++
 docs/UserGuide/Cluster/Cluster-Setup.md            |  11 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 .../Administration-Management/Administration.md    |   3 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md |  46 +++
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  11 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   4 +-
 .../DDL-Data-Definition-Language.md                |   2 +-
 example/hadoop/pom.xml                             |   3 +-
 example/udf/pom.xml                                |   2 +-
 hadoop/pom.xml                                     |   2 +-
 hive-connector/pom.xml                             |   5 +-
 .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java     |   9 +-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  14 +-
 pom.xml                                            |   3 +-
 .../resources/conf/iotdb-engine.properties         | 279 ++++++++--------
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  26 +-
 .../apache/iotdb/db/auth/entity/PrivilegeType.java |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  39 ++-
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |  28 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  27 +-
 .../iotdb/db/doublewrite/DoubleWriteConsumer.java  |  14 +-
 .../iotdb/db/doublewrite/DoubleWriteProducer.java  |   2 +-
 .../iotdb/db/doublewrite/DoubleWriteType.java      |  18 ++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  46 ++-
 .../db/engine/cache/TimeSeriesMetadataCache.java   |   2 +-
 .../compaction/CompactionMergeTaskPoolManager.java |  38 ++-
 .../db/engine/compaction/TsFileManagement.java     |  58 +++-
 .../level/LevelCompactionTsFileManagement.java     | 349 ++++++++++++---------
 .../no/NoCompactionTsFileManagement.java           | 132 ++++----
 .../engine/compaction/utils/CompactionUtils.java   |  25 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  11 +
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   4 +
 .../db/engine/merge/manage/MergeResource.java      |   2 +-
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  14 +-
 .../db/engine/storagegroup/StorageGroupInfo.java   |   6 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 118 ++++---
 .../db/engine/storagegroup/TsFileProcessor.java    |  50 +--
 .../engine/storagegroup/TsFileProcessorInfo.java   |   6 +-
 .../db/engine/storagegroup/TsFileResource.java     |  11 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  18 --
 .../storagegroup/timeindex/FileTimeIndex.java      |   5 -
 .../engine/storagegroup/timeindex/ITimeIndex.java  |   8 -
 .../iotdb/db/engine/upgrade/UpgradeTask.java       |  40 +++
 .../org/apache/iotdb/db/metadata/MManager.java     |   6 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  15 +-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |  10 -
 .../physical/BatchPlan.java}                       |  46 ++-
 .../db/qp/physical/crud/InsertMultiTabletPlan.java |  48 ++-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |  42 ++-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |  49 ++-
 .../db/qp/physical/crud/InsertTabletPlan.java      |  24 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  39 ++-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  71 ++++-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   9 +
 .../iotdb/db/query/executor/LastQueryExecutor.java |  41 ++-
 .../iotdb/db/query/executor/QueryRouter.java       |   6 +-
 .../row/ElasticSerializableRowRecordList.java      |  13 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 230 +++++++-------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  18 +-
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   |  15 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  16 +
 .../java/org/apache/iotdb/db/utils/AuthUtils.java  |   2 -
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |   4 +-
 .../org/apache/iotdb/db/utils/UpgradeUtils.java    |  10 -
 .../apache/iotdb/db/auth/AuthorityCheckerTest.java |  23 +-
 .../auth/authorizer/LocalFileAuthorizerTest.java   |   6 +-
 .../db/engine/compaction/CompactionChunkTest.java  |   8 +-
 .../compaction/LevelCompactionCacheTest.java       |   5 +-
 .../engine/compaction/LevelCompactionLogTest.java  |   5 +-
 .../compaction/LevelCompactionMergeTest.java       |  78 ++++-
 .../engine/compaction/LevelCompactionModsTest.java |  45 ++-
 .../compaction/LevelCompactionMoreDataTest.java    |   5 +-
 .../LevelCompactionTsFileManagementTest.java       |  69 ++++
 .../NoCompactionTsFileManagementTest.java          |  75 ++++-
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  75 +++++
 .../engine/storagegroup/TsFileProcessorTest.java   |   8 +-
 .../iotdb/db/integration/IOTDBGroupByIT.java       |  19 ++
 .../iotdb/db/integration/IoTDBGroupByFillIT.java   |  22 ++
 .../iotdb/db/integration/IoTDBGroupByMonthIT.java  |  98 +++++-
 ...oTDBLoadExternalTsFileWithTimePartitionIT.java} |  44 ++-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |  36 +++
 .../integration/IoTDBUDTFAlignByTimeQueryIT.java   |  16 +
 .../aggregation/IoTDBAggregationByLevelIT.java     |  19 ++
 .../db/integration/auth/IoTDBAuthorizationIT.java  |  72 +++++
 .../iotdb/db/metadata/MManagerBasicTest.java       |  71 +++++
 .../org/apache/iotdb/db/script/EnvScriptIT.java    |   6 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      |  57 ++++
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |  20 +-
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  |  10 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   4 +-
 .../rpc/TimeoutChangeableTFastFramedTransport.java |   8 +-
 .../TimeoutChangeableTSnappyFramedTransport.java   |  20 +-
 .../java/org/apache/iotdb/session/SessionUT.java   |   2 +-
 site/src/main/.vuepress/config.js                  |  12 +-
 spark-tsfile/pom.xml                               |   2 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |   8 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  21 +-
 .../tsfile/read/filter/GroupByMonthFilter.java     |  17 +-
 .../tsfile/v2/file/metadata/TsFileMetadataV2.java  |   9 +-
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |  16 +-
 145 files changed, 2795 insertions(+), 1159 deletions(-)

diff --cc server/src/assembly/resources/conf/iotdb-engine.properties
index 4ae5d52,123db71..5d15a9b
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@@ -643,29 -654,21 +654,31 @@@ timestamp_precision=m
  # index_root_dir=data/index
  
  # Is index enable
- enable_index=false
+ # enable_index=false
  
  # How many threads can concurrently build index. When <= 0, use CPU core number.
- concurrent_index_build_thread=0
+ # concurrent_index_build_thread=0
  
  # the default size of sliding window used for the subsequence matching in index framework
- default_index_window_range=10
+ # default_index_window_range=10
  
  # buffer parameter for index processor.
- index_buffer_size=134217728
+ # index_buffer_size=134217728
  
  # whether enable data partition. If disabled, all data belongs to partition 0
- enable_partition=false
+ # enable_partition=false
  
  # time range for partitioning data inside each storage group, the unit is second
- partition_interval=604800
+ # partition_interval=604800
+ 
+ # concurrent_writing_time_partition=1
 +
 +####################
 +### Double Write Configuration
 +####################
 +
 +# enable_double_write=false
 +# secondary_address=127.0.0.1
 +# secondary_port=6668
 +# secondary_user=root
- # secondary_password=root
++# secondary_password=root
diff --cc server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
index b5b3ace,0000000..638cb58
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
@@@ -1,181 -1,0 +1,173 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.doublewrite;
 +
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.rpc.*;
 +import org.apache.iotdb.service.rpc.thrift.*;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TBinaryProtocol;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.time.ZoneId;
 +import java.util.concurrent.BlockingQueue;
 +
 +public class DoubleWriteConsumer implements Runnable {
 +  private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteConsumer.class);
-   private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
++  private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
 +  private TSIService.Iface doubleWriteClient;
 +  private TTransport transport;
 +  private long sessionId;
-   private long consumerCnt = 0;
-   private long consumerTime = 0;
 +
 +  public DoubleWriteConsumer(
 +      BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue) {
 +    this.doubleWriteQueue = doubleWriteQueue;
 +    init();
 +  }
 +
 +  @Override
 +  public void run() {
 +    try {
 +      while (true) {
-         long startTime = System.nanoTime();
 +        Pair<DoubleWriteType, TSInsertRecordsReq> head = doubleWriteQueue.take();
 +        if (head.left == DoubleWriteType.DOUBLE_WRITE_END) {
 +          break;
 +        }
 +        switch (head.left) {
 +          case TSInsertRecordsReq:
 +            TSInsertRecordsReq tsInsertRecordsReq = head.right;
 +            try {
 +              RpcUtils.verifySuccessWithRedirection(
 +                  doubleWriteClient.insertRecords(tsInsertRecordsReq));
 +            } catch (TException e) {
 +              if (reconnect()) {
 +                try {
 +                  RpcUtils.verifySuccess(doubleWriteClient.insertRecords(tsInsertRecordsReq));
 +                } catch (TException tException) {
 +                  throw new IoTDBConnectionException(tException);
 +                }
 +              } else {
 +                throw new IoTDBConnectionException(
 +                    "Fail to reconnect to server. Please check server status");
 +              }
 +            }
 +            break;
++          default:
++            throw new UnsupportedOperationException(String.valueOf(head.left));
 +        }
-         consumerCnt += 1;
-         long endTime = System.nanoTime();
-         consumerTime += endTime - startTime;
 +      }
 +
 +      TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
 +      try {
 +        doubleWriteClient.closeSession(req);
 +      } catch (TException e) {
 +        throw new IoTDBConnectionException(
 +            "Error occurs when closing session at server. Maybe server is down.", e);
 +      } finally {
 +        if (transport != null) {
 +          transport.close();
 +        }
 +      }
 +    } catch (RedirectException
 +        | StatementExecutionException
 +        | InterruptedException
 +        | IoTDBConnectionException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +
-   public double getEfficiency() {
-     return (double) consumerCnt / (double) consumerTime * 1000000000.0;
-   }
- 
 +  private boolean reconnect() {
 +    boolean flag = false;
 +    for (int i = 1; i <= 3; i++) {
 +      try {
 +        if (transport != null) {
 +          close();
 +          init();
 +          flag = true;
 +        }
 +      } catch (Exception e) {
 +        try {
 +          Thread.sleep(1000);
 +        } catch (InterruptedException e1) {
 +          LOGGER.error("reconnect is interrupted.", e1);
 +          Thread.currentThread().interrupt();
 +        }
 +      }
 +    }
 +    return flag;
 +  }
 +
 +  private void close() throws IoTDBConnectionException {
 +    TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
 +    try {
 +      doubleWriteClient.closeSession(req);
 +    } catch (TException e) {
 +      throw new IoTDBConnectionException(
 +          "Error occurs when closing session at server. Maybe server is down.", e);
 +    } finally {
 +      if (transport != null) {
 +        transport.close();
 +      }
 +    }
 +  }
 +
 +  private void init() {
 +    try {
 +      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 +      RpcTransportFactory.setDefaultBufferCapacity(config.getThriftDefaultBufferSize());
 +      EndPoint endPoint = new EndPoint(config.getSecondaryAddress(), config.getSecondaryPort());
 +      RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
 +
 +      transport =
 +          RpcTransportFactory.INSTANCE.getTransport(
 +              new TSocket(endPoint.getIp(), endPoint.getPort(), 0));
 +      try {
 +        transport.open();
 +      } catch (TTransportException e) {
 +        throw new IoTDBConnectionException(e);
 +      }
 +
 +      doubleWriteClient = new TSIService.Client(new TBinaryProtocol(transport));
 +      doubleWriteClient = RpcUtils.newSynchronizedClient(doubleWriteClient);
 +
 +      TSOpenSessionReq openReq = new TSOpenSessionReq();
 +      openReq.setUsername(config.getSecondaryUser());
 +      openReq.setPassword(config.getSecondaryPassword());
 +      openReq.setZoneId(ZoneId.systemDefault().toString());
 +
 +      try {
 +        TSOpenSessionResp openResp = doubleWriteClient.openSession(openReq);
 +        RpcUtils.verifySuccess(openResp.getStatus());
 +        sessionId = openResp.getSessionId();
 +      } catch (Exception e) {
 +        transport.close();
 +        throw new IoTDBConnectionException(e);
 +      }
 +    } catch (IoTDBConnectionException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
index 6c44398,0000000..0c4e31a
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
@@@ -1,41 -1,0 +1,41 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.doublewrite;
 +
 +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 +import org.apache.iotdb.tsfile.utils.Pair;
 +
 +import java.util.concurrent.BlockingQueue;
 +
 +public class DoubleWriteProducer {
-   private BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
++  private final BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue;
 +
 +  public DoubleWriteProducer(
 +      BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue) {
 +    this.doubleWriteQueue = doubleWriteQueue;
 +  }
 +
 +  public void put(Pair<DoubleWriteType, TSInsertRecordsReq> reqPair) {
 +    try {
 +      doubleWriteQueue.put(reqPair);
 +    } catch (InterruptedException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java
index e6dc12e,0000000..838fadc
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteType.java
@@@ -1,9 -1,0 +1,27 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
 +package org.apache.iotdb.db.doublewrite;
 +
 +public enum DoubleWriteType {
 +  TSInsertRecordReq,
 +  TSInsertRecordsReq,
 +  TSInsertRecordsOfOneDeviceReq,
 +  TSInsertStringRecordsReq,
 +  DOUBLE_WRITE_END
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 5a3a162,8b6f2ec..ae267c2
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@@ -190,8 -193,6 +190,7 @@@ public class TSServiceImpl implements T
    // Record the username for every rpc connection (session).
    private final Map<Long, String> sessionIdUsernameMap = new ConcurrentHashMap<>();
    private final Map<Long, ZoneId> sessionIdZoneIdMap = new ConcurrentHashMap<>();
 +  private final Map<Long, DoubleWriteProducer> sessionIdProducerMap = new ConcurrentHashMap<>();
-   private final Map<Long, DoubleWriteConsumer> sessionIdConsumerMap = new ConcurrentHashMap<>();
  
    // The sessionId is unique in one IoTDB instance.
    private final AtomicLong sessionIdGenerator = new AtomicLong();
@@@ -281,17 -282,6 +280,16 @@@
            IoTDBConstant.GLOBAL_DB_NAME,
            tsStatus.message,
            req.getUsername());
 +
 +      // if open double write
 +      if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) {
 +        BlockingQueue<Pair<DoubleWriteType, TSInsertRecordsReq>> doubleWriteQueue =
 +            new LinkedBlockingQueue<>();
 +        DoubleWriteProducer doubleWriteProducer = new DoubleWriteProducer(doubleWriteQueue);
 +        DoubleWriteConsumer doubleWriteConsumer = new DoubleWriteConsumer(doubleWriteQueue);
 +        new Thread(doubleWriteConsumer).start();
 +        sessionIdProducerMap.put(sessionId, doubleWriteProducer);
-         sessionIdConsumerMap.put(sessionId, doubleWriteConsumer);
 +      }
      } else {
        tsStatus =
            RpcUtils.getStatus(
@@@ -323,13 -313,6 +321,12 @@@
        }
      }
  
 +    // if open double write
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite()) {
 +      sessionIdProducerMap.get(sessionId).put(new Pair<>(DoubleWriteType.DOUBLE_WRITE_END, null));
 +      sessionIdProducerMap.remove(sessionId);
-       sessionIdConsumerMap.remove(sessionId);
 +    }
 +
      return new TSStatus(
          sessionIdUsernameMap.remove(sessionId) == null
              ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR)