You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/07/13 09:31:25 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] Fix operation sync endless loop (#6665)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 52b2db4c96 [To rel/0.13] Fix operation sync endless loop (#6665)
52b2db4c96 is described below
commit 52b2db4c964b4ce2a8c696e1460f99efeb33a903
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Wed Jul 13 17:31:20 2022 +0800
[To rel/0.13] Fix operation sync endless loop (#6665)
---
.../service/basic/StandaloneServiceProvider.java | 114 ---------------------
.../db/service/thrift/impl/TSServiceImpl.java | 108 +++++++++++++++++++
2 files changed, 108 insertions(+), 114 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
index bddb0cac9d..a931ffe27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/StandaloneServiceProvider.java
@@ -18,15 +18,7 @@
*/
package org.apache.iotdb.db.service.basic;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
-import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncLogService;
-import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
-import org.apache.iotdb.db.doublelive.OperationSyncProducer;
-import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageEngineReadonlyException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -36,73 +28,11 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
public class StandaloneServiceProvider extends ServiceProvider {
- /* OperationSync module */
- private static final boolean isEnableOperationSync =
- IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
- private final SessionPool operationSyncsessionPool;
- private final OperationSyncProducer operationSyncProducer;
- private final OperationSyncDDLProtector operationSyncDDLProtector;
- private final OperationSyncLogService operationSyncDDLLogService;
-
public StandaloneServiceProvider() throws QueryProcessException {
super(new PlanExecutor());
- if (isEnableOperationSync) {
- /* Open OperationSync */
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // create SessionPool for OperationSync
- operationSyncsessionPool =
- new SessionPool(
- config.getSecondaryAddress(),
- config.getSecondaryPort(),
- config.getSecondaryUser(),
- config.getSecondaryPassword(),
- 5);
-
- // create operationSyncDDLProtector and operationSyncDDLLogService
- operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
- new Thread(operationSyncDDLProtector).start();
- operationSyncDDLLogService =
- new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
- new Thread(operationSyncDDLLogService).start();
-
- // create OperationSyncProducer
- BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
- operationSyncProducer = new OperationSyncProducer(blockingQueue);
-
- // create OperationSyncDMLProtector and OperationSyncDMLLogService
- OperationSyncDMLProtector operationSyncDMLProtector =
- new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
- new Thread(operationSyncDMLProtector).start();
- OperationSyncLogService operationSyncDMLLogService =
- new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
- new Thread(operationSyncDMLLogService).start();
-
- // create OperationSyncConsumer
- for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
- OperationSyncConsumer consumer =
- new OperationSyncConsumer(
- blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
- new Thread(consumer).start();
- }
- } else {
- operationSyncsessionPool = null;
- operationSyncProducer = null;
- operationSyncDDLProtector = null;
- operationSyncDDLLogService = null;
- }
}
@Override
@@ -121,50 +51,6 @@ public class StandaloneServiceProvider extends ServiceProvider {
throw new StorageEngineReadonlyException();
}
- if (isEnableOperationSync) {
- // OperationSync should transmit before execute
- transmitOperationSync(plan);
- }
-
return executor.processNonQuery(plan);
}
-
- private void transmitOperationSync(PhysicalPlan physicalPlan) {
-
- OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
- OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
- if (planType == null) {
- // Don't need OperationSync
- return;
- }
-
- // serialize physical plan
- ByteBuffer buffer;
- try {
- int size = physicalPlan.getSerializedSize();
- ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
- DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
- physicalPlan.serialize(operationSyncSerializeStream);
- buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
- } catch (IOException e) {
- LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
- return;
- }
-
- switch (planType) {
- case DDLPlan:
- // Create OperationSyncWriteTask and wait
- OperationSyncWriteTask ddlTask =
- new OperationSyncWriteTask(
- buffer,
- operationSyncsessionPool,
- operationSyncDDLProtector,
- operationSyncDDLLogService);
- ddlTask.run();
- break;
- case DMLPlan:
- // Put into OperationSyncProducer
- operationSyncProducer.put(new Pair<>(buffer, planType));
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 7e3ee0b5e0..5717f3ca5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,7 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -126,6 +132,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetUsingTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -133,11 +140,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -148,6 +158,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -168,6 +180,14 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
/** Thrift RPC implementation at server side. */
public class TSServiceImpl implements TSIService.Iface {
+ /* OperationSync module */
+ private static final boolean isEnableOperationSync =
+ IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+ private final SessionPool operationSyncsessionPool;
+ private final OperationSyncProducer operationSyncProducer;
+ private final OperationSyncDDLProtector operationSyncDDLProtector;
+ private final OperationSyncLogService operationSyncDDLLogService;
+
protected class QueryTask implements Callable<TSExecuteStatementResp> {
private PhysicalPlan plan;
@@ -311,6 +331,51 @@ public class TSServiceImpl implements TSIService.Iface {
public TSServiceImpl() {
super();
serviceProvider = IoTDB.serviceProvider;
+ if (isEnableOperationSync) {
+ /* Open OperationSync */
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // create SessionPool for OperationSync
+ operationSyncsessionPool =
+ new SessionPool(
+ config.getSecondaryAddress(),
+ config.getSecondaryPort(),
+ config.getSecondaryUser(),
+ config.getSecondaryPassword(),
+ 5);
+
+ // create operationSyncDDLProtector and operationSyncDDLLogService
+ operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+ new Thread(operationSyncDDLProtector).start();
+ operationSyncDDLLogService =
+ new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+ new Thread(operationSyncDDLLogService).start();
+
+ // create OperationSyncProducer
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+ operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+ // create OperationSyncDMLProtector and OperationSyncDMLLogService
+ OperationSyncDMLProtector operationSyncDMLProtector =
+ new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+ new Thread(operationSyncDMLProtector).start();
+ OperationSyncLogService operationSyncDMLLogService =
+ new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+ new Thread(operationSyncDMLLogService).start();
+
+ // create OperationSyncConsumer
+ for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+ OperationSyncConsumer consumer =
+ new OperationSyncConsumer(
+ blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
+ new Thread(consumer).start();
+ }
+ } else {
+ operationSyncsessionPool = null;
+ operationSyncProducer = null;
+ operationSyncDDLProtector = null;
+ operationSyncDDLLogService = null;
+ }
}
@Override
@@ -2157,6 +2222,10 @@ public class TSServiceImpl implements TSIService.Iface {
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
try {
+ if (isEnableOperationSync) {
+ // OperationSync should transmit before execute
+ transmitOperationSync(plan);
+ }
return serviceProvider.executeNonQuery(plan)
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
@@ -2171,6 +2240,45 @@ public class TSServiceImpl implements TSIService.Iface {
"Log in failed. Either you are not authorized or the session has timed out.");
}
+ private void transmitOperationSync(PhysicalPlan physicalPlan) {
+
+ OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+ OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+ if (planType == null) {
+ // Don't need OperationSync
+ return;
+ }
+
+ // serialize physical plan
+ ByteBuffer buffer;
+ try {
+ int size = physicalPlan.getSerializedSize();
+ ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+ DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+ physicalPlan.serialize(operationSyncSerializeStream);
+ buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
+ } catch (IOException e) {
+ LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
+ return;
+ }
+
+ switch (planType) {
+ case DDLPlan:
+ // Create OperationSyncWriteTask and wait
+ OperationSyncWriteTask ddlTask =
+ new OperationSyncWriteTask(
+ buffer,
+ operationSyncsessionPool,
+ operationSyncDDLProtector,
+ operationSyncDDLLogService);
+ ddlTask.run();
+ break;
+ case DMLPlan:
+ // Put into OperationSyncProducer
+ operationSyncProducer.put(new Pair<>(buffer, planType));
+ }
+ }
+
/** Add stat of operation into metrics */
private void addOperationLatency(Operation operation, long startTime) {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {