You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/09/27 01:27:31 UTC
[iotdb] branch master updated: [IOTDB-4491] Achieve idempotence of Create Trigger On DataNode (#7409)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 85329a92a8 [IOTDB-4491] Achieve idempotence of Create Trigger On DataNode (#7409)
85329a92a8 is described below
commit 85329a92a89ac8c05b68fc9cb40fd96b7001635b
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Tue Sep 27 09:27:25 2022 +0800
[IOTDB-4491] Achieve idempotence of Create Trigger On DataNode (#7409)
---
.../commons/executable/ExecutableManager.java | 107 ++++++++++---
.../apache/iotdb/commons/service/ServiceType.java | 3 -
.../iotdb/commons/trigger/TriggerInformation.java | 17 +-
.../apache/iotdb/commons/trigger/TriggerTable.java | 25 ++-
.../trigger/service/TriggerExecutableManager.java | 34 +---
.../commons/udf/service/UDFExecutableManager.java | 6 +
.../udf/service/UDFRegistrationService.java | 6 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../engine/trigger/executor/TriggerExecutor.java | 2 +-
.../trigger/service/TriggerClassLoaderManager.java | 2 +-
.../service/TriggerRegistrationService.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 84 +++++++++-
.../java/org/apache/iotdb/db/service/DataNode.java | 17 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 15 +-
.../db}/trigger/service/TriggerClassLoader.java | 2 +-
.../trigger/service/TriggerClassLoaderManager.java | 55 +++----
.../trigger/service/TriggerManagementService.java | 176 +++++++++++----------
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 +
.../src/main/thrift/confignode.thrift | 4 +-
thrift/src/main/thrift/datanode.thrift | 8 +-
.../iotdb/trigger/api/enums/FailureStrategy.java | 37 +++--
21 files changed, 376 insertions(+), 230 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index 4be42361ab..b069c6365a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -59,27 +60,39 @@ public class ExecutableManager {
public ExecutableResource request(List<String> uris) throws URISyntaxException, IOException {
final long requestId = generateNextRequestId();
downloadExecutables(uris, requestId);
- return new ExecutableResource(requestId, getDirStringByRequestId(requestId));
+ return new ExecutableResource(requestId, getDirStringUnderTempRootByRequestId(requestId));
}
- public void moveToExtLibDir(ExecutableResource resource, String name) throws IOException {
- FileUtils.moveDirectory(getDirByRequestId(resource.getRequestId()), getDirByName(name));
+ public void moveTempDirToExtLibDir(ExecutableResource resource, String name) throws IOException {
+ FileUtils.moveDirectory(
+ getDirUnderTempRootByRequestId(resource.getRequestId()), getDirUnderLibRootByName(name));
}
- public void removeFromTemporaryLibRoot(ExecutableResource resource) {
- removeFromTemporaryLibRoot(resource.getRequestId());
+ public void moveFileUnderTempRootToExtLibDir(ExecutableResource resource, String name)
+ throws IOException {
+ FileUtils.moveFileToDirectory(
+ getFileByFullPath(
+ getDirStringUnderTempRootByRequestId(resource.getRequestId()) + File.separator + name),
+ getFileByFullPath(libRoot),
+ false);
+ }
+
+ public void copyFileToExtLibDir(String filePath) throws IOException {
+ FileUtils.copyFileToDirectory(
+ FSFactoryProducer.getFSFactory().getFile(filePath),
+ FSFactoryProducer.getFSFactory().getFile(this.libRoot));
}
- public void removeFromExtLibDir(String functionName) {
- FileUtils.deleteQuietly(getDirByName(functionName));
+ public void removeFromTemporaryLibRoot(ExecutableResource resource) {
+ removeFromTemporaryLibRoot(resource.getRequestId());
}
private synchronized long generateNextRequestId() throws IOException {
long requestId = requestCounter.getAndIncrement();
- while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+ while (FileUtils.isDirectory(getDirUnderTempRootByRequestId(requestId))) {
requestId = requestCounter.getAndIncrement();
}
- FileUtils.forceMkdir(getDirByRequestId(requestId));
+ FileUtils.forceMkdir(getDirUnderTempRootByRequestId(requestId));
return requestId;
}
@@ -101,29 +114,42 @@ public class ExecutableManager {
}
private void removeFromTemporaryLibRoot(long requestId) {
- FileUtils.deleteQuietly(getDirByRequestId(requestId));
+ FileUtils.deleteQuietly(getDirUnderTempRootByRequestId(requestId));
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// dir string and dir file generation
/////////////////////////////////////////////////////////////////////////////////////////////////
- public File getDirByRequestId(long requestId) {
- return FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+ public File getDirUnderTempRootByRequestId(long requestId) {
+ return FSFactoryProducer.getFSFactory()
+ .getFile(getDirStringUnderTempRootByRequestId(requestId));
}
- public String getDirStringByRequestId(long requestId) {
+ public String getDirStringUnderTempRootByRequestId(long requestId) {
return temporaryLibRoot + File.separator + requestId + File.separator;
}
- public File getDirByName(String name) {
- return FSFactoryProducer.getFSFactory().getFile(getDirStringByName(name));
+ public File getDirUnderLibRootByName(String name) {
+ return FSFactoryProducer.getFSFactory().getFile(getDirStringUnderLibRootByName(name));
}
- public String getDirStringByName(String name) {
+ public String getDirStringUnderLibRootByName(String name) {
return libRoot + File.separator + name + File.separator;
}
+ public File getFileUnderLibRootByName(String name) {
+ return FSFactoryProducer.getFSFactory().getFile(getFileStringUnderLibRootByName(name));
+ }
+
+ public String getFileStringUnderLibRootByName(String name) {
+ return libRoot + File.separator + name;
+ }
+
+ private File getFileByFullPath(String path) {
+ return FSFactoryProducer.getFSFactory().getFile(path);
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// transfer jar file to bytebuffer for thrift
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -138,12 +164,11 @@ public class ExecutableManager {
}
ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
fileChannel.read(byteBuffer);
+ byteBuffer.flip();
return byteBuffer;
} catch (Exception e) {
LOGGER.warn(
- "Error occurred during transferring file{} to ByteBuffer, the cause is {}",
- filePath,
- e.getMessage());
+ "Error occurred during transferring file{} to ByteBuffer, the cause is {}", filePath, e);
throw e;
}
}
@@ -156,14 +181,48 @@ public class ExecutableManager {
String destination = this.libRoot + File.separator + fileName;
Path path = Paths.get(destination);
Files.deleteIfExists(path);
- try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE)) {
- fileChannel.write(byteBuffer);
+ Files.createFile(path);
+ try (FileOutputStream outputStream = new FileOutputStream(destination)) {
+ outputStream.getChannel().write(byteBuffer);
} catch (IOException e) {
LOGGER.warn(
- "Error occurred during writing bytebuffer to {} , the cause is {}",
- destination,
- e.getMessage());
+ "Error occurred during writing bytebuffer to {} , the cause is {}", destination, e);
throw e;
}
}
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // other functions
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @param fileName given file name
+ * @return true if file exists under LibRoot
+ */
+ public boolean hasFileUnderLibRoot(String fileName) {
+ return Files.exists(Paths.get(this.libRoot + File.separator + fileName));
+ }
+
+ public boolean hasFileUnderTemporaryRoot(String fileName) {
+ return Files.exists(Paths.get(this.temporaryLibRoot + File.separator + fileName));
+ }
+
+ public void saveTextAsFileUnderTemporaryRoot(String text, String fileName) throws IOException {
+ Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+ Files.deleteIfExists(path);
+ Files.write(path, text.getBytes());
+ }
+
+ public String readTextFromFileUnderTemporaryRoot(String fileName) throws IOException {
+ Path path = Paths.get(this.temporaryLibRoot + File.separator + fileName);
+ return new String(Files.readAllBytes(path));
+ }
+
+ public String getTemporaryLibRoot() {
+ return temporaryLibRoot;
+ }
+
+ public String getLibRoot() {
+ return libRoot;
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 239ef700be..670fddf37f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -44,9 +44,6 @@ public enum ServiceType {
UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
UDF_REGISTRATION_SERVICE("UDF Registration Service", ""),
UDF_EXECUTABLE_MANAGER_SERVICE("UDF Executable Manager Service", ""),
- TRIGGER_CLASSLOADER_MANAGER_SERVICE("Trigger ClassLoader Manager Service", ""),
- TRIGGER_REGISTRATION_SERVICE("Trigger Registration Service", ""),
- TRIGGER_EXECUTABLE_MANAGER_SERVICE("Trigger Executable Manager Service", ""),
TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
TRIGGER_REGISTRATION_SERVICE_OLD("Old Standalone Trigger Registration Service", ""),
CACHE_HIT_RATIO_DISPLAY_SERVICE(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
index f85faf4bdb..0d3649ff6c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerInformation.java
@@ -49,6 +49,9 @@ public class TriggerInformation {
/** only used for Stateful Trigger */
private TDataNodeLocation dataNodeLocation;
+ /** MD5 of the Jar File */
+ private String jarFileMD5;
+
public TriggerInformation() {};
public TriggerInformation(
@@ -60,7 +63,8 @@ public class TriggerInformation {
TriggerEvent event,
TTriggerState triggerState,
boolean isStateful,
- TDataNodeLocation dataNodeLocation) {
+ TDataNodeLocation dataNodeLocation,
+ String jarFileMD5) {
this.pathPattern = pathPattern;
this.triggerName = triggerName;
this.className = className;
@@ -70,6 +74,7 @@ public class TriggerInformation {
this.triggerState = triggerState;
this.isStateful = isStateful;
this.dataNodeLocation = dataNodeLocation;
+ this.jarFileMD5 = jarFileMD5;
}
public ByteBuffer serialize() throws IOException {
@@ -91,6 +96,7 @@ public class TriggerInformation {
if (isStateful) {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, outputStream);
}
+ ReadWriteIOUtils.write(jarFileMD5, outputStream);
}
public static TriggerInformation deserialize(ByteBuffer byteBuffer) {
@@ -109,6 +115,7 @@ public class TriggerInformation {
triggerInformation.dataNodeLocation =
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
}
+ triggerInformation.jarFileMD5 = ReadWriteIOUtils.readString(byteBuffer);
return triggerInformation;
}
@@ -179,4 +186,12 @@ public class TriggerInformation {
public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
this.dataNodeLocation = dataNodeLocation;
}
+
+ public String getJarFileMD5() {
+ return jarFileMD5;
+ }
+
+ public void setJarFileMD5(String jarFileMD5) {
+ this.jarFileMD5 = jarFileMD5;
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index effd6ca1be..cee7bb6b4f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -30,10 +30,6 @@ import java.util.Map;
public class TriggerTable {
private final Map<String, TriggerInformation> triggerTable;
- // todo: Maintain a PatternTree: PathPattern -> List<String> triggerNames
- // Given a PathPattern, return the triggerNames of triggers whose PathPatterns match the given
- // one.
-
public TriggerTable() {
triggerTable = new HashMap<>();
}
@@ -45,6 +41,15 @@ public class TriggerTable {
public void addTriggerInformation(String triggerName, TriggerInformation triggerInformation) {
triggerTable.put(triggerName, triggerInformation);
}
+
+ public TriggerInformation getTriggerInformation(String triggerName) {
+ return triggerTable.get(triggerName);
+ }
+
+ public void setTriggerInformation(String triggerName, TriggerInformation triggerInformation) {
+ triggerTable.put(triggerName, triggerInformation);
+ }
+
// for dropTrigger
public void deleteTriggerInformation(String triggerName) {
triggerTable.remove(triggerName);
@@ -54,16 +59,8 @@ public class TriggerTable {
return triggerTable.containsKey(triggerName);
}
- public void activeTrigger(String triggerName) {
- triggerTable.get(triggerName).setTriggerState(TTriggerState.ACTIVE);
- }
-
- public TriggerInformation getTriggerInformation(String triggerName) {
- return triggerTable.get(triggerName);
- }
-
- public void setTriggerInformation(String triggerName, TriggerInformation triggerInformation) {
- triggerTable.put(triggerName, triggerInformation);
+ public void setTriggerState(String triggerName, TTriggerState triggerState) {
+ triggerTable.get(triggerName).setTriggerState(triggerState);
}
// for showTrigger
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java
index 25f925b80c..2996ee55a5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerExecutableManager.java
@@ -19,40 +19,16 @@
package org.apache.iotdb.commons.trigger.service;
-import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
-public class TriggerExecutableManager extends ExecutableManager implements IService {
+import java.io.IOException;
+
+public class TriggerExecutableManager extends ExecutableManager {
private TriggerExecutableManager(String temporaryLibRoot, String triggerLibRoot) {
super(temporaryLibRoot, triggerLibRoot);
}
- /////////////////////////////////////////////////////////////////////////////////////////////////
- // IService
- /////////////////////////////////////////////////////////////////////////////////////////////////
- @Override
- public void start() throws StartupException {
- try {
- SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
- SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
- } catch (Exception e) {
- throw new StartupException(e);
- }
- }
-
- @Override
- public void stop() {
- // nothing to do
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.TRIGGER_EXECUTABLE_MANAGER_SERVICE;
- }
-
/////////////////////////////////////////////////////////////////////////////////////////////////
// singleton instance holder
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -60,8 +36,10 @@ public class TriggerExecutableManager extends ExecutableManager implements IServ
private static TriggerExecutableManager INSTANCE = null;
public static synchronized TriggerExecutableManager setupAndGetInstance(
- String temporaryLibRoot, String triggerLibRoot) {
+ String temporaryLibRoot, String triggerLibRoot) throws IOException {
if (INSTANCE == null) {
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(triggerLibRoot);
INSTANCE = new TriggerExecutableManager(temporaryLibRoot, triggerLibRoot);
}
return INSTANCE;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
index a1cf6f34c8..6ea6f840c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.commons.io.FileUtils;
+
import java.io.File;
import java.io.IOException;
@@ -35,6 +37,10 @@ public class UDFExecutableManager extends ExecutableManager implements IService,
super(temporaryLibRoot, udfLibRoot);
}
+ public void removeUDFJarFromExtLibDir(String functionName) {
+ FileUtils.deleteQuietly(getDirUnderLibRootByName(functionName));
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// IService
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index c0bf6a5d3e..14bb0b510e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -173,10 +173,10 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
try {
final ExecutableResource resource = udfExecutableManager.request(uris);
try {
- udfExecutableManager.removeFromExtLibDir(functionName);
- udfExecutableManager.moveToExtLibDir(resource, functionName);
+ udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
+ udfExecutableManager.moveTempDirToExtLibDir(resource, functionName);
} catch (Exception innerException) {
- udfExecutableManager.removeFromExtLibDir(functionName);
+ udfExecutableManager.removeUDFJarFromExtLibDir(functionName);
udfExecutableManager.removeFromTemporaryLibRoot(resource);
throw innerException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 50d69dc5aa..8f7787c7c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -270,7 +270,7 @@ public class IoTDBConfig {
/** External temporary lib directory for storing downloaded trigger JAR files */
private String triggerTemporaryLibDir =
- IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_TMP_FOLDER_NAME;
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TRIGGER_TMP_FOLDER_NAME;
/** External lib directory for ext Pipe plugins, stores user-defined JAR files */
private String extPipeDir =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
index b782ad5777..b9a4695f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/executor/TriggerExecutor.java
@@ -20,13 +20,13 @@
package org.apache.iotdb.db.engine.trigger.executor;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationInformation;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.TriggerManagementException;
import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.trigger.api.TriggerAttributes;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
index 400d2adcbb..dd22241f35 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerClassLoaderManager.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.engine.trigger.service;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.TriggerManagementException;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
index f555c11376..91caddc384 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/trigger/service/TriggerRegistrationService.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -44,6 +43,7 @@ import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.RowRecord;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 670f6dfc0e..808dbf79d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -27,9 +27,11 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.executable.ExecutableManager;
+import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.TriggerTable;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -95,11 +97,15 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StartPipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.StopPipeStatement;
+import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
@@ -108,10 +114,16 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.SocketTimeoutException;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -292,12 +304,80 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
createTriggerStatement.getTriggerEvent().getId(),
createTriggerStatement.getTriggerType().getId(),
createTriggerStatement.getPathPattern().serialize(),
- createTriggerStatement.getAttributes());
+ createTriggerStatement.getAttributes(),
+ FailureStrategy.OPTIMISTIC.getId()); // set default strategy
- if (!createTriggerStatement.isUsingURI()) {
+ String libRoot = TriggerExecutableManager.getInstance().getLibRoot();
+ if (createTriggerStatement.isUsingURI()) {
+ try {
+ // download executable
+ ExecutableResource resource =
+ TriggerExecutableManager.getInstance()
+ .request(Collections.singletonList(createTriggerStatement.getJarPath()));
+ String uriString = createTriggerStatement.getJarPath();
+ String jarFileName = uriString.substring(uriString.lastIndexOf("/") + 1);
+ // move to ext
+ TriggerExecutableManager.getInstance()
+ .moveFileUnderTempRootToExtLibDir(resource, jarFileName);
+ tCreateTriggerReq.setJarPath(jarFileName);
+ // jarFilePath after moving to ext lib
+ String jarFilePathUnderLib =
+ TriggerExecutableManager.getInstance().getFileStringUnderLibRootByName(jarFileName);
+ tCreateTriggerReq.setJarFile(ExecutableManager.transferToBytebuffer(jarFilePathUnderLib));
+ tCreateTriggerReq.setJarMD5(
+ DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderLib))));
+
+ } catch (IOException | URISyntaxException e) {
+ LOGGER.warn(
+ "Failed to download executable for trigger({}) using URI: {}, the cause is: {}",
+ createTriggerStatement.getTriggerName(),
+ createTriggerStatement.getJarPath(),
+ e);
+ future.setException(
+ new IoTDBException(
+ "Failed to download executable for trigger '"
+ + createTriggerStatement.getTriggerName()
+ + "'",
+ TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
+ return future;
+ }
+ } else {
+ // change libRoot
+ libRoot = createTriggerStatement.getJarPath();
+ // set jarPath to file name instead of the full path
+ tCreateTriggerReq.setJarPath(new File(createTriggerStatement.getJarPath()).getName());
// If jarPath is a file path, we transfer it to ByteBuffer and send it to ConfigNode.
tCreateTriggerReq.setJarFile(
ExecutableManager.transferToBytebuffer(createTriggerStatement.getJarPath()));
+ // set md5 of the jar file
+ tCreateTriggerReq.setJarMD5(
+ DigestUtils.md5Hex(
+ Files.newInputStream(Paths.get(createTriggerStatement.getJarPath()))));
+ }
+
+ // try to create instance, this request will fail if creation is not successful
+ try (TriggerClassLoader classLoader = new TriggerClassLoader(libRoot)) {
+ Class<?> triggerClass =
+ Class.forName(createTriggerStatement.getClassName(), true, classLoader);
+ Trigger trigger = (Trigger) triggerClass.getDeclaredConstructor().newInstance();
+ tCreateTriggerReq.setFailureStrategy(trigger.getFailureStrategy().getId());
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ LOGGER.warn(
+ "Failed to create trigger when try to create trigger({}) instance first, the cause is: {}",
+ createTriggerStatement.getTriggerName(),
+ e);
+ future.setException(
+ new IoTDBException(
+ "Failed to load class '"
+ + createTriggerStatement.getClassName()
+ + "', because it's not found in jar file: "
+ + createTriggerStatement.getJarPath(),
+ TSStatusCode.TRIGGER_LOAD_CLASS.getStatusCode()));
+ return future;
}
final TSStatus executionStatus = client.createTrigger(tCreateTriggerReq);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 55f92de243..e25b510c6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.StartupChecks;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -65,7 +64,6 @@ import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
import org.apache.iotdb.db.sync.SyncService;
-import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -291,7 +289,7 @@ public class DataNode implements DataNodeMBean {
registerManager.register(DriverScheduler.getInstance());
registerUdfServices();
- registerTriggerServices();
+ initTriggerRelatedInstance();
logger.info(
"IoTDB DataNode is setting up, some storage groups may not be ready now, please wait several seconds...");
@@ -378,12 +376,13 @@ public class DataNode implements DataNodeMBean {
config.getSystemDir() + File.separator + "udf" + File.separator));
}
- private void registerTriggerServices() throws StartupException {
- registerManager.register(
- TriggerExecutableManager.setupAndGetInstance(
- config.getTriggerTemporaryLibDir(), config.getTriggerDir()));
- registerManager.register(TriggerClassLoaderManager.setupAndGetInstance(config.getTriggerDir()));
- registerManager.register(TriggerManagementService.setupAndGetInstance());
+ private void initTriggerRelatedInstance() throws StartupException {
+ try {
+ TriggerExecutableManager.setupAndGetInstance(
+ config.getTriggerTemporaryLibDir(), config.getTriggerDir());
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
}
private void initSchemaEngine() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 4d434f5a3a..f28d85768e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
+import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -96,6 +97,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
@@ -126,8 +128,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TcreateTriggerInstanceReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -925,7 +925,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
@Override
- public TSStatus createTriggerInstance(TcreateTriggerInstanceReq req) throws TException {
+ public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) throws TException {
TriggerInformation triggerInformation = TriggerInformation.deserialize(req.triggerInformation);
try {
// set state to INACTIVE when creating trigger instance
@@ -940,7 +940,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
LOGGER.warn(
"Error occurred when creating trigger instance for trigger: {}. The cause is {}.",
triggerInformation.getTriggerName(),
- e.getMessage());
+ e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
@@ -948,11 +948,14 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
@Override
- public TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req) throws TException {
+ public TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) throws TException {
try {
TriggerManagementService.getInstance().activeTrigger(req.triggerName);
} catch (Exception e) {
- LOGGER.error("Error occurred during ");
+ LOGGER.error(
+ "Error occurred during active trigger instance for trigger: {}. The cause is {}.",
+ req.triggerName,
+ e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoader.java
similarity index 97%
rename from node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
rename to server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoader.java
index 9356c23f73..b344ccb338 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.commons.trigger.service;
+package org.apache.iotdb.db.trigger.service;
import org.apache.iotdb.commons.file.SystemFileFactory;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoaderManager.java
similarity index 63%
rename from node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
rename to server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoaderManager.java
index f94d343ef9..e82c82e558 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/service/TriggerClassLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerClassLoaderManager.java
@@ -17,21 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.commons.trigger.service;
+package org.apache.iotdb.db.trigger.service;
-import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class TriggerClassLoaderManager implements IService {
+public class TriggerClassLoaderManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerClassLoaderManager.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
/** The dir that stores jar files. */
private final String libRoot;
@@ -42,10 +43,10 @@ public class TriggerClassLoaderManager implements IService {
*/
private volatile TriggerClassLoader activeClassLoader;
- private TriggerClassLoaderManager(String libRoot) {
+ private TriggerClassLoaderManager(String libRoot) throws IOException {
this.libRoot = libRoot;
LOGGER.info("Trigger lib root: {}", libRoot);
- activeClassLoader = null;
+ activeClassLoader = new TriggerClassLoader(libRoot);
}
/** Call this method to get up-to-date ClassLoader before registering triggers */
@@ -60,44 +61,24 @@ public class TriggerClassLoaderManager implements IService {
return activeClassLoader;
}
- /////////////////////////////////////////////////////////////////////////////////////////////////
- // IService
- /////////////////////////////////////////////////////////////////////////////////////////////////
-
- @Override
- public void start() throws StartupException {
- try {
- SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
- activeClassLoader = new TriggerClassLoader(libRoot);
- } catch (IOException e) {
- throw new StartupException(this.getID().getName(), e.getMessage());
- }
- }
-
- @Override
- public void stop() {
- // nothing to do
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.TRIGGER_CLASSLOADER_MANAGER_SERVICE;
- }
-
/////////////////////////////////////////////////////////////////////////////////////////////////
// singleton instance holder
/////////////////////////////////////////////////////////////////////////////////////////////////
- private static TriggerClassLoaderManager INSTANCE = null;
+ private static class TriggerClassLoaderManagerHolder {
+ private static final TriggerClassLoaderManager INSTANCE;
- public static synchronized TriggerClassLoaderManager setupAndGetInstance(String libRoot) {
- if (INSTANCE == null) {
- INSTANCE = new TriggerClassLoaderManager(libRoot);
+ static {
+ try {
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(CONFIG.getTriggerDir());
+ INSTANCE = new TriggerClassLoaderManager(CONFIG.getTriggerDir());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- return INSTANCE;
}
public static TriggerClassLoaderManager getInstance() {
- return INSTANCE;
+ return TriggerClassLoaderManagerHolder.INSTANCE;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index f3904fd71d..319b8bc309 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -19,30 +19,29 @@
package org.apache.iotdb.db.trigger.service;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoader;
-import org.apache.iotdb.commons.trigger.service.TriggerClassLoaderManager;
-import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
-public class TriggerManagementService implements IService {
+public class TriggerManagementService {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManagementService.class);
@@ -52,9 +51,7 @@ public class TriggerManagementService implements IService {
private final Map<String, TriggerExecutor> executorMap;
- private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
-
- private TDataNodeLocation tDataNodeLocationCache;
+ private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private TriggerManagementService() {
this.lock = new ReentrantLock();
@@ -62,55 +59,108 @@ public class TriggerManagementService implements IService {
this.executorMap = new ConcurrentHashMap<>();
}
- public void acquireRegistrationLock() {
+ public void acquireLock() {
lock.lock();
}
- public void releaseRegistrationLock() {
+ public void releaseLock() {
lock.unlock();
}
- public void register(TriggerInformation triggerInformation) {
+ public void register(TriggerInformation triggerInformation) throws IOException {
try {
- acquireRegistrationLock();
+ acquireLock();
checkIfRegistered(triggerInformation);
doRegister(triggerInformation);
- } catch (Exception e) {
- LOGGER.warn(
- "Failed to register trigger({}) on data node, the cause is: {}",
- triggerInformation.getTriggerName(),
- e.getMessage());
} finally {
- releaseRegistrationLock();
+ releaseLock();
}
- };
+ }
public void activeTrigger(String triggerName) {
- triggerTable.activeTrigger(triggerName);
- };
+ try {
+ acquireLock();
+ triggerTable.setTriggerState(triggerName, TTriggerState.ACTIVE);
+ } finally {
+ releaseLock();
+ }
+ }
- private void checkIfRegistered(TriggerInformation triggerInformation)
- throws TriggerManagementException {
+ public void inactiveTrigger(String triggerName) {
+ try {
+ acquireLock();
+ triggerTable.setTriggerState(triggerName, TTriggerState.INACTIVE);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ private void checkIfRegistered(TriggerInformation triggerInformation) throws IOException {
String triggerName = triggerInformation.getTriggerName();
if (triggerTable.containsTrigger(triggerName)) {
- String errorMessage =
- String.format(
- "Failed to registered trigger %s, "
- + "because trigger %s has already been registered in TriggerTable",
- triggerName, triggerName);
- LOGGER.warn(errorMessage);
- throw new TriggerManagementException(errorMessage);
+ String jarName = triggerInformation.getJarName();
+ if (TriggerExecutableManager.getInstance().hasFileUnderLibRoot(jarName)) {
+ // A jar with the same name exists, we need to check md5
+ String existedMd5 = "";
+ String md5FilePath = triggerName + ".txt";
+
+ // if meet error when reading md5 from txt, we need to compute it again
+ boolean hasComputed = false;
+ if (TriggerExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) {
+ try {
+ existedMd5 =
+ TriggerExecutableManager.getInstance()
+ .readTextFromFileUnderTemporaryRoot(md5FilePath);
+ hasComputed = true;
+ } catch (IOException e) {
+ LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath);
+ }
+ }
+ if (!hasComputed) {
+ try {
+ existedMd5 =
+ DigestUtils.md5Hex(
+ Files.newInputStream(
+ Paths.get(
+ TriggerExecutableManager.getInstance().getLibRoot()
+ + File.separator
+ + triggerInformation.getJarName())));
+ // save the md5 in a txt under trigger temporary lib
+ TriggerExecutableManager.getInstance()
+ .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
+ } catch (IOException e) {
+ String errorMessage =
+ String.format(
+ "Failed to registered trigger %s, "
+ + "because error occurred when trying to compute md5 of jar file for trigger %s ",
+ triggerName, triggerName);
+ LOGGER.warn(errorMessage);
+ throw e;
+ }
+ }
+
+ if (!existedMd5.equals(triggerInformation.getJarFileMD5())) {
+ // same jar name with different md5
+ String errorMessage =
+ String.format(
+ "Failed to registered trigger %s, "
+ + "because existed md5 of jar file for trigger %s is different from the new jar file. ",
+ triggerName, triggerName);
+ LOGGER.warn(errorMessage);
+ throw new TriggerManagementException(errorMessage);
+ }
+ }
}
}
- private void doRegister(TriggerInformation triggerInformation) {
+ private void doRegister(TriggerInformation triggerInformation) throws IOException {
try (TriggerClassLoader currentActiveClassLoader =
TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
String triggerName = triggerInformation.getTriggerName();
triggerTable.addTriggerInformation(triggerName, triggerInformation);
// if it is a stateful trigger, we only create its instance on specified DataNode
if (!triggerInformation.isStateful()
- || triggerInformation.getDataNodeLocation().equals(getTDataNodeLocation())) {
+ || triggerInformation.getDataNodeLocation().getDataNodeId() == DATA_NODE_ID) {
// get trigger instance
Trigger trigger =
constructTriggerInstance(triggerInformation.getClassName(), currentActiveClassLoader);
@@ -122,15 +172,13 @@ public class TriggerManagementService implements IService {
String errorMessage =
String.format(
"Failed to register trigger %s with className: %s. The cause is: %s",
- triggerInformation.getTriggerName(),
- triggerInformation.getClassName(),
- e.getMessage());
+ triggerInformation.getTriggerName(), triggerInformation.getClassName(), e);
LOGGER.warn(errorMessage);
- throw new TriggerManagementException(errorMessage);
+ throw e;
}
}
- private Trigger constructTriggerInstance(String className, TriggerClassLoader classLoader)
+ public Trigger constructTriggerInstance(String className, TriggerClassLoader classLoader)
throws TriggerManagementException {
try {
Class<?> triggerClass = Class.forName(className, true, classLoader);
@@ -146,52 +194,14 @@ public class TriggerManagementService implements IService {
}
}
- private TDataNodeLocation getTDataNodeLocation() {
- if (tDataNodeLocationCache == null) {
- // Set DataNodeLocation
- tDataNodeLocationCache = new TDataNodeLocation();
- tDataNodeLocationCache.setDataNodeId(CONFIG.getDataNodeId());
- tDataNodeLocationCache.setClientRpcEndPoint(
- new TEndPoint(CONFIG.getRpcAddress(), CONFIG.getRpcPort()));
- tDataNodeLocationCache.setInternalEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getInternalPort()));
- tDataNodeLocationCache.setMPPDataExchangeEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getMppDataExchangePort()));
- tDataNodeLocationCache.setDataRegionConsensusEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getDataRegionConsensusPort()));
- tDataNodeLocationCache.setSchemaRegionConsensusEndPoint(
- new TEndPoint(CONFIG.getInternalAddress(), CONFIG.getSchemaRegionConsensusPort()));
- }
- return tDataNodeLocationCache;
- }
-
- @Override
- public void start() throws StartupException {}
-
- @Override
- public void stop() {
- // nothing to do
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.TRIGGER_REGISTRATION_SERVICE;
- }
-
/////////////////////////////////////////////////////////////////////////////////////////////////
// singleton instance holder
/////////////////////////////////////////////////////////////////////////////////////////////////
-
- private static TriggerManagementService INSTANCE = null;
-
- public static synchronized TriggerManagementService setupAndGetInstance() {
- if (INSTANCE == null) {
- INSTANCE = new TriggerManagementService();
- }
- return INSTANCE;
+ public static TriggerManagementService getInstance() {
+ return TriggerManagementServiceHolder.INSTANCE;
}
- public static TriggerManagementService getInstance() {
- return INSTANCE;
+ private static class TriggerManagementServiceHolder {
+ private static final TriggerManagementService INSTANCE = new TriggerManagementService();
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6aaeb8c335..2b010489d5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -75,6 +75,8 @@ public enum TSStatusCode {
VERIFY_METADATA_ERROR(343),
MEASUREMENT_IN_BLACK_LIST(344),
COLOSSAL_RECORD(349),
+ TRIGGER_LOAD_CLASS(360),
+ TRIGGER_DOWNLOAD_ERROR(361),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 2f89ba994e..b956c66bc7 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -264,7 +264,9 @@ struct TCreateTriggerReq {
6: required byte triggerType
7: required binary pathPattern,
8: required map<string, string> attributes,
- 9: optional binary jarFile
+ 9: optional binary jarFile,
+ 10: optional string jarMD5,
+ 11: required i32 failureStrategy
}
struct TDropTriggerReq {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 2fc8d346ef..12d3a41ddb 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -169,12 +169,12 @@ struct TDropFunctionRequest {
1: required string udfName
}
-struct TcreateTriggerInstanceReq {
+struct TCreateTriggerInstanceReq {
1: required binary triggerInformation
2: required binary jarFile
}
-struct TactiveTriggerInstanceReq {
+struct TActiveTriggerInstanceReq {
1: required string triggerName
}
@@ -421,14 +421,14 @@ service IDataNodeRPCService {
*
* @param TriggerInformation, jar file.
**/
- common.TSStatus createTriggerInstance(TcreateTriggerInstanceReq req)
+ common.TSStatus createTriggerInstance(TCreateTriggerInstanceReq req)
/**
* Config node will active a trigger instance on data node.
*
* @param trigger name.
**/
- common.TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req)
+ common.TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req)
/**
* Config node will drop a trigger on all online config nodes and data nodes.
diff --git a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
index 5920f33996..6f3faf342e 100644
--- a/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
+++ b/trigger-api/src/main/java/org/apache/iotdb/trigger/api/enums/FailureStrategy.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.trigger.api.enums;
import org.apache.iotdb.trigger.api.Trigger;
@@ -34,16 +33,34 @@ public enum FailureStrategy {
* not have any influence on the triggers that have not been fired. The failure of this Trigger
* will be simply ignored.
*/
- OPTIMISTIC,
+ OPTIMISTIC(0),
/**
- * If this strategy were adopted, the failure of {@link Trigger#fire(Tablet)} of one Tablet would
- * throw an exception and end this insertion. If a PESSIMISTIC trigger whose TRIGGER_EVENT is
- * {@link TriggerEvent#BEFORE_INSERT} fails to fire in an insertion, all the triggers that have
- * not fired will not be fired in this insertion and this insertion will not be executed. if a
- * PESSIMISTIC trigger whose TRIGGER_EVENT is {@link TriggerEvent#AFTER_INSERT} fails to fire in
- * an insertion, all the triggers that have not fired will not be fired, and this insertion will
- * be marked as failed even if the insertion itself executed successfully.
+ * If this strategy were adopted, the failure of {@link Trigger#fire(Tablet)} of one Tablet
+ * would @@ -45,5 +45,26 @@ public enum FailureStrategy { an insertion, all the triggers that have
+ * not fired will not be fired, and this insertion will be marked as failed even if the insertion
+ * itself executed successfully.
*/
- PESSIMISTIC,
+ PESSIMISTIC(1);
+
+ private final int id;
+
+ FailureStrategy(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public static FailureStrategy construct(int id) {
+ switch (id) {
+ case 0:
+ return FailureStrategy.OPTIMISTIC;
+ case 1:
+ return FailureStrategy.PESSIMISTIC;
+ default:
+ throw new UnsupportedOperationException("Unsupported FailureStrategy Type.");
+ }
+ }
}