You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/23 17:38:23 UTC
[iotdb] 09/18: ConfigManager ConfigRequestExecutor
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-3227
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d7a195f81917b1201f9efe00fc70c6f2c8d2af5e
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 19 16:56:38 2022 +0800
ConfigManager ConfigRequestExecutor
---
.../consensus/request/ConfigRequest.java | 3 +
.../consensus/request/ConfigRequestType.java | 3 +-
.../CreateFunctionReq.java} | 60 ++++++-------------
.../iotdb/confignode/manager/ConfigManager.java | 13 ++++-
.../UDFManager.java} | 61 +++++++-------------
.../UDFInfo.java} | 67 ++++++++--------------
.../executor/ConfigRequestExecutor.java | 17 ++++--
7 files changed, 92 insertions(+), 132 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index dcef6a91f8..2f4220198a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -182,6 +183,8 @@ public abstract class ConfigRequest implements IConsensusRequest {
case ApplyConfigNode:
req = new ApplyConfigNodeReq();
break;
+ case CreateFunction:
+ req = new CreateFunctionReq();
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index d0fd464f4f..d3844f22c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -57,5 +57,6 @@ public enum ConfigRequestType {
ListRolePrivilege,
ListUserRoles,
ListRoleUsers,
- ApplyConfigNode
+ ApplyConfigNode,
+ CreateFunction,
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
similarity index 50%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
index d0fd464f4f..2cfe6ffe44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
@@ -16,46 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request;
-public enum ConfigRequestType {
- RegisterDataNode,
- GetDataNodeInfo,
- SetStorageGroup,
- SetTTL,
- SetSchemaReplicationFactor,
- SetDataReplicationFactor,
- SetTimePartitionInterval,
- DeleteStorageGroup,
- GetStorageGroup,
- CountStorageGroup,
- CreateRegions,
- DeleteRegions,
- GetSchemaPartition,
- CreateSchemaPartition,
- GetOrCreateSchemaPartition,
- GetDataPartition,
- CreateDataPartition,
- GetOrCreateDataPartition,
- UpdateProcedure,
- DeleteProcedure,
- Author,
- CreateUser,
- CreateRole,
- DropUser,
- DropRole,
- GrantRole,
- GrantUser,
- GrantRoleToUser,
- RevokeUser,
- RevokeRole,
- RevokeRoleFromUser,
- UpdateUser,
- ListUser,
- ListRole,
- ListUserPrivilege,
- ListRolePrivilege,
- ListUserRoles,
- ListRoleUsers,
- ApplyConfigNode
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CreateFunctionReq extends ConfigRequest {
+
+ public CreateFunctionReq() {
+ super(ConfigRequestType.CreateFunction);
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {}
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 87d44c5174..4e6267cae9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -99,6 +100,9 @@ public class ConfigManager implements Manager {
/** Manage procedure */
private final ProcedureManager procedureManager;
+ /** UDF */
+ private final UDFManager udfManager;
+
public ConfigManager() throws IOException {
// Build the persistence module
NodeInfo nodeInfo = new NodeInfo();
@@ -106,11 +110,12 @@ public class ConfigManager implements Manager {
PartitionInfo partitionInfo = new PartitionInfo();
AuthorInfo authorInfo = new AuthorInfo();
ProcedureInfo procedureInfo = new ProcedureInfo();
+ UDFInfo udfInfo = new UDFInfo();
// Build state machine and executor
ConfigRequestExecutor executor =
new ConfigRequestExecutor(
- nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo);
+ nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo, udfInfo);
PartitionRegionStateMachine stateMachine = new PartitionRegionStateMachine(this, executor);
// Build the manager module
@@ -119,6 +124,7 @@ public class ConfigManager implements Manager {
this.partitionManager = new PartitionManager(this, partitionInfo);
this.permissionManager = new PermissionManager(this, authorInfo);
this.procedureManager = new ProcedureManager(this, procedureInfo);
+ this.udfManager = new UDFManager(this, udfInfo);
this.loadManager = new LoadManager(this);
this.consensusManager = new ConsensusManager(stateMachine);
@@ -558,7 +564,10 @@ public class ConfigManager implements Manager {
@Override
public TSStatus createFunction(String udfName, String className, List<String> uris) {
- throw new UnsupportedOperationException();
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? udfManager.createFunction(udfName, className, uris)
+ : status;
}
public ProcedureManager getProcedureManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
similarity index 50%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index d0fd464f4f..b5fbc250aa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -16,46 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request;
-public enum ConfigRequestType {
- RegisterDataNode,
- GetDataNodeInfo,
- SetStorageGroup,
- SetTTL,
- SetSchemaReplicationFactor,
- SetDataReplicationFactor,
- SetTimePartitionInterval,
- DeleteStorageGroup,
- GetStorageGroup,
- CountStorageGroup,
- CreateRegions,
- DeleteRegions,
- GetSchemaPartition,
- CreateSchemaPartition,
- GetOrCreateSchemaPartition,
- GetDataPartition,
- CreateDataPartition,
- GetOrCreateDataPartition,
- UpdateProcedure,
- DeleteProcedure,
- Author,
- CreateUser,
- CreateRole,
- DropUser,
- DropRole,
- GrantRole,
- GrantUser,
- GrantRoleToUser,
- RevokeUser,
- RevokeRole,
- RevokeRoleFromUser,
- UpdateUser,
- ListUser,
- ListRole,
- ListUserPrivilege,
- ListRolePrivilege,
- ListUserRoles,
- ListRoleUsers,
- ApplyConfigNode
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
+
+import java.util.List;
+
+public class UDFManager {
+
+ private final ConfigManager configManager;
+ private final UDFInfo udfInfo;
+
+ public UDFManager(ConfigManager configManager, UDFInfo udfInfo) {
+ this.configManager = configManager;
+ this.udfInfo = udfInfo;
+ }
+
+ public TSStatus createFunction(String udfName, String className, List<String> uris) {
+ return new TSStatus();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
similarity index 50%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
index d0fd464f4f..4ba470d7c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -16,46 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request;
-
-public enum ConfigRequestType {
- RegisterDataNode,
- GetDataNodeInfo,
- SetStorageGroup,
- SetTTL,
- SetSchemaReplicationFactor,
- SetDataReplicationFactor,
- SetTimePartitionInterval,
- DeleteStorageGroup,
- GetStorageGroup,
- CountStorageGroup,
- CreateRegions,
- DeleteRegions,
- GetSchemaPartition,
- CreateSchemaPartition,
- GetOrCreateSchemaPartition,
- GetDataPartition,
- CreateDataPartition,
- GetOrCreateDataPartition,
- UpdateProcedure,
- DeleteProcedure,
- Author,
- CreateUser,
- CreateRole,
- DropUser,
- DropRole,
- GrantRole,
- GrantUser,
- GrantRoleToUser,
- RevokeUser,
- RevokeRole,
- RevokeRoleFromUser,
- UpdateUser,
- ListUser,
- ListRole,
- ListUserPrivilege,
- ListRolePrivilege,
- ListUserRoles,
- ListRoleUsers,
- ApplyConfigNode
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+
+public class UDFInfo implements SnapshotProcessor {
+
+ public TSStatus createFunction(CreateFunctionReq req) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
+ return false;
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException, IOException {}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 4b7bc5d7ce..42be0cd33f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -48,6 +49,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -75,17 +77,21 @@ public class ConfigRequestExecutor {
private final ProcedureInfo procedureInfo;
+ private final UDFInfo udfInfo;
+
public ConfigRequestExecutor(
NodeInfo nodeInfo,
ClusterSchemaInfo clusterSchemaInfo,
PartitionInfo partitionInfo,
AuthorInfo authorInfo,
- ProcedureInfo procedureInfo) {
+ ProcedureInfo procedureInfo,
+ UDFInfo udfInfo) {
this.nodeInfo = nodeInfo;
this.clusterSchemaInfo = clusterSchemaInfo;
this.partitionInfo = partitionInfo;
this.authorInfo = authorInfo;
this.procedureInfo = procedureInfo;
+ this.udfInfo = udfInfo;
}
public DataSet executorQueryPlan(ConfigRequest req)
@@ -168,6 +174,8 @@ public class ConfigRequestExecutor {
return authorInfo.authorNonQuery((AuthorReq) req);
case ApplyConfigNode:
return nodeInfo.updateConfigNodeList((ApplyConfigNodeReq) req);
+ case CreateFunction:
+ return udfInfo.createFunction((CreateFunctionReq) req);
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
@@ -197,8 +205,7 @@ public class ConfigRequestExecutor {
}
AtomicBoolean result = new AtomicBoolean(true);
- getAllAttributes()
- .parallelStream()
+ getAllAttributes().parallelStream()
.forEach(
x -> {
boolean takeSnapshotResult = true;
@@ -227,8 +234,7 @@ public class ConfigRequestExecutor {
return;
}
- getAllAttributes()
- .parallelStream()
+ getAllAttributes().parallelStream()
.forEach(
x -> {
try {
@@ -244,6 +250,7 @@ public class ConfigRequestExecutor {
allAttributes.add(clusterSchemaInfo);
allAttributes.add(partitionInfo);
allAttributes.add(nodeInfo);
+ allAttributes.add(udfInfo);
return allAttributes;
}
}