You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/14 03:49:37 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] Support MQTT operation sync (#6666)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 1a9501642b [To rel/0.13] Support MQTT operation sync (#6666)
1a9501642b is described below

commit 1a9501642b3fad2cce51fe589b236409f106f273
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Jul 14 11:49:32 2022 +0800

    [To rel/0.13] Support MQTT operation sync (#6666)
---
 .../iotdb/db/integration/IoTDBRestartIT.java       |   2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 109 ++++++++++++++++++++-
 .../iotdb/db/protocol/mqtt/PublishHandler.java     |   9 +-
 .../db/service/thrift/impl/TSServiceImpl.java      | 104 +-------------------
 4 files changed, 119 insertions(+), 105 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 63cc8403c2..c6984d9f23 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -442,7 +442,7 @@ public class IoTDBRestartIT {
           Assert.assertEquals("2.2", resultSet.getString(3));
           cnt++;
         }
-        Assert.assertEquals(1, cnt);
+        Assert.assertEquals(0, cnt);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index a1c6a056d9..669957515d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -23,6 +23,13 @@ import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.ServerConfigConsistent;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
+import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -50,6 +57,7 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
@@ -64,6 +72,7 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -71,8 +80,11 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -86,6 +98,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -101,6 +115,14 @@ public class StorageEngine implements IService {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
 
+  /* OperationSync module */
+  private static final boolean isEnableOperationSync =
+      IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+  private static SessionPool operationSyncsessionPool;
+  private static OperationSyncProducer operationSyncProducer;
+  private static OperationSyncDDLProtector operationSyncDDLProtector;
+  private static OperationSyncLogService operationSyncDDLLogService;
+
   /**
    * Time range for dividing storage group, the time unit is the same with IoTDB's
    * TimestampPrecision
@@ -135,7 +157,53 @@ public class StorageEngine implements IService {
   private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
   private List<FlushListener> customFlushListeners = new ArrayList<>();
 
-  private StorageEngine() {}
+  private StorageEngine() {
+    if (isEnableOperationSync) {
+      /* Open OperationSync */
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      // create SessionPool for OperationSync
+      operationSyncsessionPool =
+          new SessionPool(
+              config.getSecondaryAddress(),
+              config.getSecondaryPort(),
+              config.getSecondaryUser(),
+              config.getSecondaryPassword(),
+              5);
+
+      // create operationSyncDDLProtector and operationSyncDDLLogService
+      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+      new Thread(operationSyncDDLProtector).start();
+      operationSyncDDLLogService =
+          new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+      new Thread(operationSyncDDLLogService).start();
+
+      // create OperationSyncProducer
+      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+          blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+      operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+      // create OperationSyncDMLProtector and OperationSyncDMLLogService
+      OperationSyncDMLProtector operationSyncDMLProtector =
+          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+      new Thread(operationSyncDMLProtector).start();
+      OperationSyncLogService operationSyncDMLLogService =
+          new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+      new Thread(operationSyncDMLLogService).start();
+
+      // create OperationSyncConsumer
+      for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+        OperationSyncConsumer consumer =
+            new OperationSyncConsumer(
+                blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
+        new Thread(consumer).start();
+      }
+    } else {
+      operationSyncsessionPool = null;
+      operationSyncProducer = null;
+      operationSyncDDLProtector = null;
+      operationSyncDDLLogService = null;
+    }
+  }
 
   public static StorageEngine getInstance() {
     return InstanceHolder.INSTANCE;
@@ -147,6 +215,45 @@ public class StorageEngine implements IService {
             IoTDBDescriptor.getInstance().getConfig().getPartitionInterval() * 1000L);
   }
 
+  public static void transmitOperationSync(PhysicalPlan physicalPlan) {
+
+    OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+        OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+    if (planType == null) {
+      // Don't need OperationSync
+      return;
+    }
+
+    // serialize physical plan
+    ByteBuffer buffer;
+    try {
+      int size = physicalPlan.getSerializedSize();
+      ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+      DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+      physicalPlan.serialize(operationSyncSerializeStream);
+      buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
+    } catch (IOException e) {
+      logger.error("OperationSync can't serialize PhysicalPlan", e);
+      return;
+    }
+
+    switch (planType) {
+      case DDLPlan:
+        // Create OperationSyncWriteTask and wait
+        OperationSyncWriteTask ddlTask =
+            new OperationSyncWriteTask(
+                buffer,
+                operationSyncsessionPool,
+                operationSyncDDLProtector,
+                operationSyncDDLLogService);
+        ddlTask.run();
+        break;
+      case DMLPlan:
+        // Put into OperationSyncProducer
+        operationSyncProducer.put(new Pair<>(buffer, planType));
+    }
+  }
+
   public static long convertMilliWithPrecision(long milliTime) {
     long result = milliTime;
     String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 3bf7144e73..a4e707a6d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -18,6 +18,8 @@
 package org.apache.iotdb.db.protocol.mqtt;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.service.IoTDB;
@@ -44,7 +46,8 @@ public class PublishHandler extends AbstractInterceptHandler {
 
   private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
   private long sessionId;
-
+  private static final boolean isEnableOperationSync =
+      IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
   private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
 
   private final PayloadFormatter payloadFormat;
@@ -123,6 +126,10 @@ public class PublishHandler extends AbstractInterceptHandler {
         if (tsStatus != null) {
           LOG.warn(tsStatus.message);
         } else {
+          if (isEnableOperationSync) {
+            // OperationSync should transmit before execute
+            StorageEngine.transmitOperationSync(plan);
+          }
           status = serviceProvider.executeNonQuery(plan);
         }
       } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 5717f3ca5e..44a911f6cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,13 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
-import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
-import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncLogService;
 import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
-import org.apache.iotdb.db.doublelive.OperationSyncProducer;
-import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -132,7 +127,6 @@ import org.apache.iotdb.service.rpc.thrift.TSSetUsingTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
-import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -140,14 +134,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
@@ -158,8 +149,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -180,13 +169,8 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl implements TSIService.Iface {
 
-  /* OperationSync module */
   private static final boolean isEnableOperationSync =
       IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
-  private final SessionPool operationSyncsessionPool;
-  private final OperationSyncProducer operationSyncProducer;
-  private final OperationSyncDDLProtector operationSyncDDLProtector;
-  private final OperationSyncLogService operationSyncDDLLogService;
 
   protected class QueryTask implements Callable<TSExecuteStatementResp> {
 
@@ -331,51 +315,6 @@ public class TSServiceImpl implements TSIService.Iface {
   public TSServiceImpl() {
     super();
     serviceProvider = IoTDB.serviceProvider;
-    if (isEnableOperationSync) {
-      /* Open OperationSync */
-      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-      // create SessionPool for OperationSync
-      operationSyncsessionPool =
-          new SessionPool(
-              config.getSecondaryAddress(),
-              config.getSecondaryPort(),
-              config.getSecondaryUser(),
-              config.getSecondaryPassword(),
-              5);
-
-      // create operationSyncDDLProtector and operationSyncDDLLogService
-      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
-      new Thread(operationSyncDDLProtector).start();
-      operationSyncDDLLogService =
-          new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
-      new Thread(operationSyncDDLLogService).start();
-
-      // create OperationSyncProducer
-      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-          blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
-      operationSyncProducer = new OperationSyncProducer(blockingQueue);
-
-      // create OperationSyncDMLProtector and OperationSyncDMLLogService
-      OperationSyncDMLProtector operationSyncDMLProtector =
-          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
-      new Thread(operationSyncDMLProtector).start();
-      OperationSyncLogService operationSyncDMLLogService =
-          new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
-      new Thread(operationSyncDMLLogService).start();
-
-      // create OperationSyncConsumer
-      for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
-        OperationSyncConsumer consumer =
-            new OperationSyncConsumer(
-                blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
-        new Thread(consumer).start();
-      }
-    } else {
-      operationSyncsessionPool = null;
-      operationSyncProducer = null;
-      operationSyncDDLProtector = null;
-      operationSyncDDLLogService = null;
-    }
   }
 
   @Override
@@ -2224,7 +2163,7 @@ public class TSServiceImpl implements TSIService.Iface {
     try {
       if (isEnableOperationSync) {
         // OperationSync should transmit before execute
-        transmitOperationSync(plan);
+        StorageEngine.transmitOperationSync(plan);
       }
       return serviceProvider.executeNonQuery(plan)
           ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
@@ -2240,45 +2179,6 @@ public class TSServiceImpl implements TSIService.Iface {
         "Log in failed. Either you are not authorized or the session has timed out.");
   }
 
-  private void transmitOperationSync(PhysicalPlan physicalPlan) {
-
-    OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
-        OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
-    if (planType == null) {
-      // Don't need OperationSync
-      return;
-    }
-
-    // serialize physical plan
-    ByteBuffer buffer;
-    try {
-      int size = physicalPlan.getSerializedSize();
-      ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
-      DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
-      physicalPlan.serialize(operationSyncSerializeStream);
-      buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
-    } catch (IOException e) {
-      LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
-      return;
-    }
-
-    switch (planType) {
-      case DDLPlan:
-        // Create OperationSyncWriteTask and wait
-        OperationSyncWriteTask ddlTask =
-            new OperationSyncWriteTask(
-                buffer,
-                operationSyncsessionPool,
-                operationSyncDDLProtector,
-                operationSyncDDLLogService);
-        ddlTask.run();
-        break;
-      case DMLPlan:
-        // Put into OperationSyncProducer
-        operationSyncProducer.put(new Pair<>(buffer, planType));
-    }
-  }
-
   /** Add stat of operation into metrics */
   private void addOperationLatency(Operation operation, long startTime) {
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {