You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/14 03:49:37 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] Support MQTT operation sync (#6666)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 1a9501642b [To rel/0.13] Support MQTT operation sync (#6666)
1a9501642b is described below
commit 1a9501642b3fad2cce51fe589b236409f106f273
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu Jul 14 11:49:32 2022 +0800
[To rel/0.13] Support MQTT operation sync (#6666)
---
.../iotdb/db/integration/IoTDBRestartIT.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 109 ++++++++++++++++++++-
.../iotdb/db/protocol/mqtt/PublishHandler.java | 9 +-
.../db/service/thrift/impl/TSServiceImpl.java | 104 +-------------------
4 files changed, 119 insertions(+), 105 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 63cc8403c2..c6984d9f23 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -442,7 +442,7 @@ public class IoTDBRestartIT {
Assert.assertEquals("2.2", resultSet.getString(3));
cnt++;
}
- Assert.assertEquals(1, cnt);
+ Assert.assertEquals(0, cnt);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index a1c6a056d9..669957515d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -23,6 +23,13 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.ServerConfigConsistent;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
+import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -50,6 +57,7 @@ import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
@@ -64,6 +72,7 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -71,8 +80,11 @@ import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -86,6 +98,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -101,6 +115,14 @@ public class StorageEngine implements IService {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+ /* OperationSync module */
+ private static final boolean isEnableOperationSync =
+ IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+ private static SessionPool operationSyncsessionPool;
+ private static OperationSyncProducer operationSyncProducer;
+ private static OperationSyncDDLProtector operationSyncDDLProtector;
+ private static OperationSyncLogService operationSyncDDLLogService;
+
/**
* Time range for dividing storage group, the time unit is the same with IoTDB's
* TimestampPrecision
@@ -135,7 +157,53 @@ public class StorageEngine implements IService {
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
- private StorageEngine() {}
+ private StorageEngine() {
+ if (isEnableOperationSync) {
+ /* Open OperationSync */
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // create SessionPool for OperationSync
+ operationSyncsessionPool =
+ new SessionPool(
+ config.getSecondaryAddress(),
+ config.getSecondaryPort(),
+ config.getSecondaryUser(),
+ config.getSecondaryPassword(),
+ 5);
+
+ // create operationSyncDDLProtector and operationSyncDDLLogService
+ operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+ new Thread(operationSyncDDLProtector).start();
+ operationSyncDDLLogService =
+ new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+ new Thread(operationSyncDDLLogService).start();
+
+ // create OperationSyncProducer
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+ operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+ // create OperationSyncDMLProtector and OperationSyncDMLLogService
+ OperationSyncDMLProtector operationSyncDMLProtector =
+ new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+ new Thread(operationSyncDMLProtector).start();
+ OperationSyncLogService operationSyncDMLLogService =
+ new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+ new Thread(operationSyncDMLLogService).start();
+
+ // create OperationSyncConsumer
+ for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+ OperationSyncConsumer consumer =
+ new OperationSyncConsumer(
+ blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
+ new Thread(consumer).start();
+ }
+ } else {
+ operationSyncsessionPool = null;
+ operationSyncProducer = null;
+ operationSyncDDLProtector = null;
+ operationSyncDDLLogService = null;
+ }
+ }
public static StorageEngine getInstance() {
return InstanceHolder.INSTANCE;
@@ -147,6 +215,45 @@ public class StorageEngine implements IService {
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval() * 1000L);
}
+ public static void transmitOperationSync(PhysicalPlan physicalPlan) {
+
+ OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+ OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+ if (planType == null) {
+ // Don't need OperationSync
+ return;
+ }
+
+ // serialize physical plan
+ ByteBuffer buffer;
+ try {
+ int size = physicalPlan.getSerializedSize();
+ ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+ DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+ physicalPlan.serialize(operationSyncSerializeStream);
+ buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
+ } catch (IOException e) {
+ logger.error("OperationSync can't serialize PhysicalPlan", e);
+ return;
+ }
+
+ switch (planType) {
+ case DDLPlan:
+ // Create OperationSyncWriteTask and wait
+ OperationSyncWriteTask ddlTask =
+ new OperationSyncWriteTask(
+ buffer,
+ operationSyncsessionPool,
+ operationSyncDDLProtector,
+ operationSyncDDLLogService);
+ ddlTask.run();
+ break;
+ case DMLPlan:
+ // Put into OperationSyncProducer
+ operationSyncProducer.put(new Pair<>(buffer, planType));
+ }
+ }
+
public static long convertMilliWithPrecision(long milliTime) {
long result = milliTime;
String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
index 3bf7144e73..a4e707a6d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java
@@ -18,6 +18,8 @@
package org.apache.iotdb.db.protocol.mqtt;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -44,7 +46,8 @@ public class PublishHandler extends AbstractInterceptHandler {
private final ServiceProvider serviceProvider = IoTDB.serviceProvider;
private long sessionId;
-
+ private static final boolean isEnableOperationSync =
+ IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
private final PayloadFormatter payloadFormat;
@@ -123,6 +126,10 @@ public class PublishHandler extends AbstractInterceptHandler {
if (tsStatus != null) {
LOG.warn(tsStatus.message);
} else {
+ if (isEnableOperationSync) {
+ // OperationSync should transmit before execute
+ StorageEngine.transmitOperationSync(plan);
+ }
status = serviceProvider.executeNonQuery(plan);
}
} catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 5717f3ca5e..44a911f6cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,13 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
-import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
-import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
-import org.apache.iotdb.db.doublelive.OperationSyncLogService;
import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
-import org.apache.iotdb.db.doublelive.OperationSyncProducer;
-import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -132,7 +127,6 @@ import org.apache.iotdb.service.rpc.thrift.TSSetUsingTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
-import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -140,14 +134,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -158,8 +149,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -180,13 +169,8 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
/** Thrift RPC implementation at server side. */
public class TSServiceImpl implements TSIService.Iface {
- /* OperationSync module */
private static final boolean isEnableOperationSync =
IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
- private final SessionPool operationSyncsessionPool;
- private final OperationSyncProducer operationSyncProducer;
- private final OperationSyncDDLProtector operationSyncDDLProtector;
- private final OperationSyncLogService operationSyncDDLLogService;
protected class QueryTask implements Callable<TSExecuteStatementResp> {
@@ -331,51 +315,6 @@ public class TSServiceImpl implements TSIService.Iface {
public TSServiceImpl() {
super();
serviceProvider = IoTDB.serviceProvider;
- if (isEnableOperationSync) {
- /* Open OperationSync */
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // create SessionPool for OperationSync
- operationSyncsessionPool =
- new SessionPool(
- config.getSecondaryAddress(),
- config.getSecondaryPort(),
- config.getSecondaryUser(),
- config.getSecondaryPassword(),
- 5);
-
- // create operationSyncDDLProtector and operationSyncDDLLogService
- operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
- new Thread(operationSyncDDLProtector).start();
- operationSyncDDLLogService =
- new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
- new Thread(operationSyncDDLLogService).start();
-
- // create OperationSyncProducer
- BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
- operationSyncProducer = new OperationSyncProducer(blockingQueue);
-
- // create OperationSyncDMLProtector and OperationSyncDMLLogService
- OperationSyncDMLProtector operationSyncDMLProtector =
- new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
- new Thread(operationSyncDMLProtector).start();
- OperationSyncLogService operationSyncDMLLogService =
- new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
- new Thread(operationSyncDMLLogService).start();
-
- // create OperationSyncConsumer
- for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
- OperationSyncConsumer consumer =
- new OperationSyncConsumer(
- blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
- new Thread(consumer).start();
- }
- } else {
- operationSyncsessionPool = null;
- operationSyncProducer = null;
- operationSyncDDLProtector = null;
- operationSyncDDLLogService = null;
- }
}
@Override
@@ -2224,7 +2163,7 @@ public class TSServiceImpl implements TSIService.Iface {
try {
if (isEnableOperationSync) {
// OperationSync should transmit before execute
- transmitOperationSync(plan);
+ StorageEngine.transmitOperationSync(plan);
}
return serviceProvider.executeNonQuery(plan)
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
@@ -2240,45 +2179,6 @@ public class TSServiceImpl implements TSIService.Iface {
"Log in failed. Either you are not authorized or the session has timed out.");
}
- private void transmitOperationSync(PhysicalPlan physicalPlan) {
-
- OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
- OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
- if (planType == null) {
- // Don't need OperationSync
- return;
- }
-
- // serialize physical plan
- ByteBuffer buffer;
- try {
- int size = physicalPlan.getSerializedSize();
- ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
- DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
- physicalPlan.serialize(operationSyncSerializeStream);
- buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
- } catch (IOException e) {
- LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
- return;
- }
-
- switch (planType) {
- case DDLPlan:
- // Create OperationSyncWriteTask and wait
- OperationSyncWriteTask ddlTask =
- new OperationSyncWriteTask(
- buffer,
- operationSyncsessionPool,
- operationSyncDDLProtector,
- operationSyncDDLLogService);
- ddlTask.run();
- break;
- case DMLPlan:
- // Put into OperationSyncProducer
- operationSyncProducer.put(new Pair<>(buffer, planType));
- }
- }
-
/** Add stat of operation into metrics */
private void addOperationLatency(Operation operation, long startTime) {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnablePerformanceStat()) {