You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/30 03:11:54 UTC
[iotdb] 04/05: done
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-3228
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8cafea9f2091e0880560fa17d084ee2ead8615ea
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 30 11:08:48 2022 +0800
done
---
.../confignode/client/AsyncDataNodeClientPool.java | 19 ++++-
...Handler.java => FunctionManagementHandler.java} | 6 +-
.../consensus/request/ConfigRequest.java | 4 +
.../consensus/request/ConfigRequestType.java | 1 +
.../consensus/request/write/DropFunctionReq.java | 56 ++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 8 ++
.../apache/iotdb/confignode/manager/Manager.java | 2 +
.../iotdb/confignode/manager/UDFManager.java | 57 +++++++++++--
.../iotdb/confignode/persistence/UDFInfo.java | 18 +++-
.../executor/ConfigRequestExecutor.java | 9 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++
.../udf/service/UDFRegistrationService.java | 2 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 ++++
.../plan/execution/config/ConfigTaskVisitor.java | 7 ++
.../plan/execution/config/DropFunctionTask.java | 99 ++++++++++++++++++++++
15 files changed, 294 insertions(+), 17 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 17d93b8f85..a38218ba04 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -25,13 +25,14 @@ import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -241,7 +242,7 @@ public class AsyncDataNodeClientPool {
* @param endPoint The specific DataNode
*/
public void createFunction(
- TEndPoint endPoint, TCreateFunctionRequest request, CreateFunctionHandler handler) {
+ TEndPoint endPoint, TCreateFunctionRequest request, FunctionManagementHandler handler) {
try {
clientManager.borrowClient(endPoint).createFunction(request, handler);
} catch (Exception e) {
@@ -249,6 +250,20 @@ public class AsyncDataNodeClientPool {
}
}
+ /**
+ * Only used in UDFManager
+ *
+ * @param endPoint The specific DataNode
+ */
+ public void dropFunction(
+ TEndPoint endPoint, TDropFunctionRequest request, FunctionManagementHandler handler) {
+ try {
+ clientManager.borrowClient(endPoint).dropFunction(request, handler);
+ } catch (Exception e) {
+ LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
+ }
+ }
+
// TODO: Is the ClientPool must be a singleton?
private static class ClientPoolHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
similarity index 89%
rename from confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
index e740a5c6a9..6ec4b4f06f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/FunctionManagementHandler.java
@@ -27,14 +27,14 @@ import org.apache.thrift.async.AsyncMethodCallback;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-public class CreateFunctionHandler implements AsyncMethodCallback<TSStatus> {
+public class FunctionManagementHandler implements AsyncMethodCallback<TSStatus> {
private final CountDownLatch countDownLatch;
private final List<TSStatus> dataNodeResponseStatus;
private final String ip;
private final int port;
- public CreateFunctionHandler(
+ public FunctionManagementHandler(
CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, String ip, int port) {
this.countDownLatch = countDownLatch;
this.dataNodeResponseStatus = dataNodeResponseStatus;
@@ -53,6 +53,6 @@ public class CreateFunctionHandler implements AsyncMethodCallback<TSStatus> {
countDownLatch.countDown();
dataNodeResponseStatus.add(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage("[" + ip + ":" + port + "] " + exception.getMessage()));
+ .setMessage("DataNode[" + ip + ":" + port + "] " + exception.getMessage()));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index 92228a7d26..83864e30dc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartition
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.DropFunctionReq;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
@@ -191,6 +192,9 @@ public abstract class ConfigRequest implements IConsensusRequest {
case CreateFunction:
req = new CreateFunctionReq();
break;
+ case DropFunction:
+ req = new DropFunctionReq();
+ break;
case GetNodePathsPartition:
req = new GetNodePathsPartitionReq();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index f0cb20ea8c..a8f122e412 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -60,5 +60,6 @@ public enum ConfigRequestType {
ListRoleUsers,
ApplyConfigNode,
CreateFunction,
+ DropFunction,
GetNodePathsPartition;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DropFunctionReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DropFunctionReq.java
new file mode 100644
index 0000000000..028660dbe4
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/DropFunctionReq.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DropFunctionReq extends ConfigRequest {
+
+ private String functionName;
+
+ public DropFunctionReq() {
+ super(ConfigRequestType.DropFunction);
+ }
+
+ public DropFunctionReq(String functionName) {
+ super(ConfigRequestType.DropFunction);
+ this.functionName = functionName;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(getType().ordinal());
+ ReadWriteIOUtils.write(functionName, buffer);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ functionName = ReadWriteIOUtils.readString(buffer);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 89ce0e1562..11ec6e2abd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -612,6 +612,14 @@ public class ConfigManager implements Manager {
: status;
}
+ @Override
+ public TSStatus dropFunction(String udfName) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? udfManager.dropFunction(udfName)
+ : status;
+ }
+
@Override
public UDFManager getUDFManager() {
return udfManager;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index b689f52258..ef930f3238 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -220,4 +220,6 @@ public interface Manager {
TSStatus applyConfigNode(ApplyConfigNodeReq applyConfigNodeReq);
TSStatus createFunction(String udfName, String className, List<String> uris);
+
+ TSStatus dropFunction(String udfName);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index a3f7b03034..4d48361334 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -23,10 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
+import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+import org.apache.iotdb.confignode.consensus.request.write.DropFunctionReq;
import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -63,8 +65,7 @@ public class UDFManager {
return configNodeStatus;
}
- return squashDataNodeResponseStatusList(
- createFunctionOnDataNodes(functionName, className, uris));
+ return squashResponseStatusList(createFunctionOnDataNodes(functionName, className, uris));
} catch (Exception e) {
final String errorMessage =
String.format(
@@ -92,7 +93,7 @@ public class UDFManager {
.createFunction(
endPoint,
request,
- new CreateFunctionHandler(
+ new FunctionManagementHandler(
countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
}
@@ -106,9 +107,53 @@ public class UDFManager {
return dataNodeResponseStatus;
}
- private TSStatus squashDataNodeResponseStatusList(List<TSStatus> dataNodeResponseStatusList) {
+ public TSStatus dropFunction(String functionName) {
+ try {
+ final List<TSStatus> nodeResponseList = dropFunctionOnDataNodes(functionName);
+ final TSStatus configNodeStatus =
+ configManager.getConsensusManager().write(new DropFunctionReq(functionName)).getStatus();
+ nodeResponseList.add(configNodeStatus);
+ return squashResponseStatusList(nodeResponseList);
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format("Failed to deregister UDF %s, because of exception: %s", functionName, e);
+ LOGGER.warn(errorMessage);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(errorMessage);
+ }
+ }
+
+ private List<TSStatus> dropFunctionOnDataNodes(String functionName) {
+ final List<TDataNodeInfo> onlineDataNodes =
+ configManager.getNodeManager().getOnlineDataNodes(-1);
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
+ final CountDownLatch countDownLatch = new CountDownLatch(onlineDataNodes.size());
+ final TDropFunctionRequest request = new TDropFunctionRequest(functionName);
+
+ for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
+ AsyncDataNodeClientPool.getInstance()
+ .dropFunction(
+ endPoint,
+ request,
+ new FunctionManagementHandler(
+ countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+ }
+
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("UDFManager was interrupted during dropping functions on data nodes", e);
+ }
+
+ return dataNodeResponseStatus;
+ }
+
+ private TSStatus squashResponseStatusList(List<TSStatus> responseStatusList) {
final List<TSStatus> failedStatus =
- dataNodeResponseStatusList.stream()
+ responseStatusList.stream()
.filter(status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
.collect(Collectors.toList());
return failedStatus.isEmpty()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
index f0aa594d44..7633d92494 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+import org.apache.iotdb.confignode.consensus.request.write.DropFunctionReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -95,7 +96,7 @@ public class UDFInfo implements SnapshotProcessor {
} catch (Exception e) {
final String errorMessage =
String.format(
- "Failed to register UDF %s(class name: %s, uris: %s), because of exception: %s",
+ "[ConfigNode] Failed to register UDF %s(class name: %s, uris: %s), because of exception: %s",
functionName, className, uris, e);
LOGGER.warn(errorMessage, e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -103,6 +104,21 @@ public class UDFInfo implements SnapshotProcessor {
}
}
+ public synchronized TSStatus dropFunction(DropFunctionReq req) {
+ try {
+ udfRegistrationService.deregister(req.getFunctionName());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "[ConfigNode] Failed to deregister UDF %s, because of exception: %s",
+ req.getFunctionName(), e);
+ LOGGER.warn(errorMessage, e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(errorMessage);
+ }
+ }
+
@Override
public synchronized boolean processTakeSnapshot(File snapshotDir) throws IOException {
return udfExecutableManager.processTakeSnapshot(snapshotDir)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 5db1567af4..1fe780ef62 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartition
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupReq;
+import org.apache.iotdb.confignode.consensus.request.write.DropFunctionReq;
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorReq;
@@ -187,6 +188,8 @@ public class ConfigRequestExecutor {
return nodeInfo.updateConfigNodeList((ApplyConfigNodeReq) req);
case CreateFunction:
return udfInfo.createFunction((CreateFunctionReq) req);
+ case DropFunction:
+ return udfInfo.dropFunction((DropFunctionReq) req);
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
@@ -216,8 +219,7 @@ public class ConfigRequestExecutor {
}
AtomicBoolean result = new AtomicBoolean(true);
- getAllAttributes()
- .parallelStream()
+ getAllAttributes().parallelStream()
.forEach(
x -> {
boolean takeSnapshotResult = true;
@@ -246,8 +248,7 @@ public class ConfigRequestExecutor {
return;
}
- getAllAttributes()
- .parallelStream()
+ getAllAttributes().parallelStream()
.forEach(
x -> {
try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 1d46688414..045a8e3b1f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -388,6 +389,11 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
return configManager.createFunction(req.getUdfName(), req.getClassName(), req.getUris());
}
+ @Override
+ public TSStatus dropFunction(TDropFunctionReq req) throws TException {
+ return configManager.dropFunction(req.getUdfName());
+ }
+
public void handleClientExit() {}
// TODO: Interfaces for data operations
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index 46a37eb356..a6214edf1c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -260,7 +260,7 @@ public class UDFRegistrationService implements IService, SnapshotProcessor {
if (information.isBuiltin()) {
String errorMessage =
String.format("Built-in function %s can not be deregistered.", functionName);
- LOGGER.error(errorMessage);
+ LOGGER.warn(errorMessage);
throw new UDFRegistrationException(errorMessage);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 2e0ce66ead..563897078d 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -605,6 +606,22 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus dropFunction(TDropFunctionReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.dropFunction(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
public static class Factory extends BaseClientFactory<PartitionRegionId, ConfigNodeClient> {
public Factory(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index ebd9327e71..7e6038049b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
@@ -96,5 +97,11 @@ public class ConfigTaskVisitor
return new CreateFunctionTask(createFunctionStatement);
}
+ @Override
+ public IConfigTask visitDropFunction(
+ DropFunctionStatement dropFunctionStatement, TaskContext context) {
+ return new DropFunctionTask(dropFunctionStatement);
+ }
+
public static class TaskContext {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/DropFunctionTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/DropFunctionTask.java
new file mode 100644
index 0000000000..6f67983284
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/DropFunctionTask.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DropFunctionTask implements IConfigTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DropFunctionTask.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private final String udfName;
+
+ public DropFunctionTask(DropFunctionStatement dropFunctionStatement) {
+ udfName = dropFunctionStatement.getUdfName();
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(
+ IClientManager<PartitionRegionId, ConfigNodeClient> clientManager)
+ throws InterruptedException {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ if (CONFIG.isClusterMode()) {
+ executeCluster(clientManager, future);
+ } else {
+ executeStandalone(future);
+ }
+ return future;
+ }
+
+ private void executeCluster(
+ IClientManager<PartitionRegionId, ConfigNodeClient> clientManager,
+ SettableFuture<ConfigTaskResult> future) {
+ try (ConfigNodeClient client = clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ final TSStatus executionStatus = client.dropFunction(new TDropFunctionReq(udfName));
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.error("[{}] Failed to drop function {} in config node.", executionStatus, udfName);
+ future.setException(new StatementExecutionException(executionStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (TException | IOException e) {
+ LOGGER.error("Failed to connect to config node.");
+ future.setException(e);
+ }
+ }
+
+ private void executeStandalone(SettableFuture<ConfigTaskResult> future) {
+ try {
+ UDFRegistrationService.getInstance().deregister(udfName);
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } catch (Exception e) {
+ final String message =
+ String.format("Failed to drop function %s, because %s.", udfName, e.getMessage());
+ LOGGER.error(message, e);
+ future.setException(
+ new StatementExecutionException(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(message)));
+ }
+ }
+}