You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/07/13 09:31:25 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13] Fix operation sync endless loop (#6665)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 52b2db4c96 [To rel/0.13] Fix operation sync endless loop (#6665)
52b2db4c96 is described below

commit 52b2db4c964b4ce2a8c696e1460f99efeb33a903
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Wed Jul 13 17:31:20 2022 +0800

    [To rel/0.13] Fix operation sync endless loop (#6665)
---
 .../service/basic/StandaloneServiceProvider.java   | 114 ---------------------
 .../db/service/thrift/impl/TSServiceImpl.java      | 108 +++++++++++++++++++
 2 files changed, 108 insertions(+), 114 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
index bddb0cac9d..a931ffe27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
@@ -18,15 +18,7 @@
  */
 package org.apache.iotdb.db.service.basic;
 
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
-import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncLogService;
-import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
-import org.apache.iotdb.db.doublelive.OperationSyncProducer;
-import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StorageEngineReadonlyException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -36,73 +28,11 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 
 public class StandaloneServiceProvider extends ServiceProvider {
 
-  /* OperationSync module */
-  private static final boolean isEnableOperationSync =
-      IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
-  private final SessionPool operationSyncsessionPool;
-  private final OperationSyncProducer operationSyncProducer;
-  private final OperationSyncDDLProtector operationSyncDDLProtector;
-  private final OperationSyncLogService operationSyncDDLLogService;
-
   public StandaloneServiceProvider() throws QueryProcessException {
     super(new PlanExecutor());
-    if (isEnableOperationSync) {
-      /* Open OperationSync */
-      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-      // create SessionPool for OperationSync
-      operationSyncsessionPool =
-          new SessionPool(
-              config.getSecondaryAddress(),
-              config.getSecondaryPort(),
-              config.getSecondaryUser(),
-              config.getSecondaryPassword(),
-              5);
-
-      // create operationSyncDDLProtector and operationSyncDDLLogService
-      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
-      new Thread(operationSyncDDLProtector).start();
-      operationSyncDDLLogService =
-          new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
-      new Thread(operationSyncDDLLogService).start();
-
-      // create OperationSyncProducer
-      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
-          blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
-      operationSyncProducer = new OperationSyncProducer(blockingQueue);
-
-      // create OperationSyncDMLProtector and OperationSyncDMLLogService
-      OperationSyncDMLProtector operationSyncDMLProtector =
-          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
-      new Thread(operationSyncDMLProtector).start();
-      OperationSyncLogService operationSyncDMLLogService =
-          new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
-      new Thread(operationSyncDMLLogService).start();
-
-      // create OperationSyncConsumer
-      for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
-        OperationSyncConsumer consumer =
-            new OperationSyncConsumer(
-                blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
-        new Thread(consumer).start();
-      }
-    } else {
-      operationSyncsessionPool = null;
-      operationSyncProducer = null;
-      operationSyncDDLProtector = null;
-      operationSyncDDLLogService = null;
-    }
   }
 
   @Override
@@ -121,50 +51,6 @@ public class StandaloneServiceProvider extends ServiceProvider {
       throw new StorageEngineReadonlyException();
     }
 
-    if (isEnableOperationSync) {
-      // OperationSync should transmit before execute
-      transmitOperationSync(plan);
-    }
-
     return executor.processNonQuery(plan);
   }
-
-  private void transmitOperationSync(PhysicalPlan physicalPlan) {
-
-    OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
-        OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
-    if (planType == null) {
-      // Don't need OperationSync
-      return;
-    }
-
-    // serialize physical plan
-    ByteBuffer buffer;
-    try {
-      int size = physicalPlan.getSerializedSize();
-      ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
-      DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
-      physicalPlan.serialize(operationSyncSerializeStream);
-      buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
-    } catch (IOException e) {
-      LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
-      return;
-    }
-
-    switch (planType) {
-      case DDLPlan:
-        // Create OperationSyncWriteTask and wait
-        OperationSyncWriteTask ddlTask =
-            new OperationSyncWriteTask(
-                buffer,
-                operationSyncsessionPool,
-                operationSyncDDLProtector,
-                operationSyncDDLLogService);
-        ddlTask.run();
-        break;
-      case DMLPlan:
-        // Put into OperationSyncProducer
-        operationSyncProducer.put(new Pair<>(buffer, planType));
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7e3ee0b5e0..5717f3ca5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,7 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
 import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
 import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -126,6 +132,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetUsingTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -133,11 +140,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
@@ -148,6 +158,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -168,6 +180,14 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl implements TSIService.Iface {
 
+  /* OperationSync module */
+  private static final boolean isEnableOperationSync =
+      IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+  private final SessionPool operationSyncsessionPool;
+  private final OperationSyncProducer operationSyncProducer;
+  private final OperationSyncDDLProtector operationSyncDDLProtector;
+  private final OperationSyncLogService operationSyncDDLLogService;
+
   protected class QueryTask implements Callable<TSExecuteStatementResp> {
 
     private PhysicalPlan plan;
@@ -311,6 +331,51 @@ public class TSServiceImpl implements TSIService.Iface {
   public TSServiceImpl() {
     super();
     serviceProvider = IoTDB.serviceProvider;
+    if (isEnableOperationSync) {
+      /* Open OperationSync */
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      // create SessionPool for OperationSync
+      operationSyncsessionPool =
+          new SessionPool(
+              config.getSecondaryAddress(),
+              config.getSecondaryPort(),
+              config.getSecondaryUser(),
+              config.getSecondaryPassword(),
+              5);
+
+      // create operationSyncDDLProtector and operationSyncDDLLogService
+      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+      new Thread(operationSyncDDLProtector).start();
+      operationSyncDDLLogService =
+          new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+      new Thread(operationSyncDDLLogService).start();
+
+      // create OperationSyncProducer
+      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+          blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+      operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+      // create OperationSyncDMLProtector and OperationSyncDMLLogService
+      OperationSyncDMLProtector operationSyncDMLProtector =
+          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+      new Thread(operationSyncDMLProtector).start();
+      OperationSyncLogService operationSyncDMLLogService =
+          new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+      new Thread(operationSyncDMLLogService).start();
+
+      // create OperationSyncConsumer
+      for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+        OperationSyncConsumer consumer =
+            new OperationSyncConsumer(
+                blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
+        new Thread(consumer).start();
+      }
+    } else {
+      operationSyncsessionPool = null;
+      operationSyncProducer = null;
+      operationSyncDDLProtector = null;
+      operationSyncDDLLogService = null;
+    }
   }
 
   @Override
@@ -2157,6 +2222,10 @@ public class TSServiceImpl implements TSIService.Iface {
 
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     try {
+      if (isEnableOperationSync) {
+        // OperationSync should transmit before execute
+        transmitOperationSync(plan);
+      }
       return serviceProvider.executeNonQuery(plan)
           ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
           : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
@@ -2171,6 +2240,45 @@ public class TSServiceImpl implements TSIService.Iface {
         "Log in failed. Either you are not authorized or the session has timed out.");
   }
 
+  private void transmitOperationSync(PhysicalPlan physicalPlan) {
+
+    OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+        OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+    if (planType == null) {
+      // Don't need OperationSync
+      return;
+    }
+
+    // serialize physical plan
+    ByteBuffer buffer;
+    try {
+      int size = physicalPlan.getSerializedSize();
+      ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+      DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+      physicalPlan.serialize(operationSyncSerializeStream);
+      buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
+    } catch (IOException e) {
+      LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
+      return;
+    }
+
+    switch (planType) {
+      case DDLPlan:
+        // Create OperationSyncWriteTask and wait
+        OperationSyncWriteTask ddlTask =
+            new OperationSyncWriteTask(
+                buffer,
+                operationSyncsessionPool,
+                operationSyncDDLProtector,
+                operationSyncDDLLogService);
+        ddlTask.run();
+        break;
+      case DMLPlan:
+        // Put into OperationSyncProducer
+        operationSyncProducer.put(new Pair<>(buffer, planType));
+    }
+  }
+
   /** Add stat of operation into metrics */
   private void addOperationLatency(Operation operation, long startTime) {
     if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {