You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/23 17:38:23 UTC

[iotdb] 09/18: ConfigManager ConfigRequestExecutor

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-3227
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d7a195f81917b1201f9efe00fc70c6f2c8d2af5e
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 19 16:56:38 2022 +0800

    ConfigManager ConfigRequestExecutor
---
 .../consensus/request/ConfigRequest.java           |  3 +
 .../consensus/request/ConfigRequestType.java       |  3 +-
 .../CreateFunctionReq.java}                        | 60 ++++++-------------
 .../iotdb/confignode/manager/ConfigManager.java    | 13 ++++-
 .../UDFManager.java}                               | 61 +++++++-------------
 .../UDFInfo.java}                                  | 67 ++++++++--------------
 .../executor/ConfigRequestExecutor.java            | 17 ++++--
 7 files changed, 92 insertions(+), 132 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index dcef6a91f8..2f4220198a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -182,6 +183,8 @@ public abstract class ConfigRequest implements IConsensusRequest {
         case ApplyConfigNode:
           req = new ApplyConfigNodeReq();
           break;
+        case CreateFunction:
+          req = new CreateFunctionReq();
         default:
           throw new IOException("unknown PhysicalPlan type: " + typeNum);
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index d0fd464f4f..d3844f22c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -57,5 +57,6 @@ public enum ConfigRequestType {
   ListRolePrivilege,
   ListUserRoles,
   ListRoleUsers,
-  ApplyConfigNode
+  ApplyConfigNode,
+  CreateFunction,
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
similarity index 50%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
index d0fd464f4f..2cfe6ffe44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
@@ -16,46 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.consensus.request;
 
-public enum ConfigRequestType {
-  RegisterDataNode,
-  GetDataNodeInfo,
-  SetStorageGroup,
-  SetTTL,
-  SetSchemaReplicationFactor,
-  SetDataReplicationFactor,
-  SetTimePartitionInterval,
-  DeleteStorageGroup,
-  GetStorageGroup,
-  CountStorageGroup,
-  CreateRegions,
-  DeleteRegions,
-  GetSchemaPartition,
-  CreateSchemaPartition,
-  GetOrCreateSchemaPartition,
-  GetDataPartition,
-  CreateDataPartition,
-  GetOrCreateDataPartition,
-  UpdateProcedure,
-  DeleteProcedure,
-  Author,
-  CreateUser,
-  CreateRole,
-  DropUser,
-  DropRole,
-  GrantRole,
-  GrantUser,
-  GrantRoleToUser,
-  RevokeUser,
-  RevokeRole,
-  RevokeRoleFromUser,
-  UpdateUser,
-  ListUser,
-  ListRole,
-  ListUserPrivilege,
-  ListRolePrivilege,
-  ListUserRoles,
-  ListRoleUsers,
-  ApplyConfigNode
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CreateFunctionReq extends ConfigRequest {
+
+  public CreateFunctionReq() {
+    super(ConfigRequestType.CreateFunction);
+  }
+
+  @Override
+  protected void serializeImpl(ByteBuffer buffer) {}
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 87d44c5174..4e6267cae9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -99,6 +100,9 @@ public class ConfigManager implements Manager {
   /** Manage procedure */
   private final ProcedureManager procedureManager;
 
+  /** UDF */
+  private final UDFManager udfManager;
+
   public ConfigManager() throws IOException {
     // Build the persistence module
     NodeInfo nodeInfo = new NodeInfo();
@@ -106,11 +110,12 @@ public class ConfigManager implements Manager {
     PartitionInfo partitionInfo = new PartitionInfo();
     AuthorInfo authorInfo = new AuthorInfo();
     ProcedureInfo procedureInfo = new ProcedureInfo();
+    UDFInfo udfInfo = new UDFInfo();
 
     // Build state machine and executor
     ConfigRequestExecutor executor =
         new ConfigRequestExecutor(
-            nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo);
+            nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo, udfInfo);
     PartitionRegionStateMachine stateMachine = new PartitionRegionStateMachine(this, executor);
 
     // Build the manager module
@@ -119,6 +124,7 @@ public class ConfigManager implements Manager {
     this.partitionManager = new PartitionManager(this, partitionInfo);
     this.permissionManager = new PermissionManager(this, authorInfo);
     this.procedureManager = new ProcedureManager(this, procedureInfo);
+    this.udfManager = new UDFManager(this, udfInfo);
     this.loadManager = new LoadManager(this);
     this.consensusManager = new ConsensusManager(stateMachine);
 
@@ -558,7 +564,10 @@ public class ConfigManager implements Manager {
 
   @Override
   public TSStatus createFunction(String udfName, String className, List<String> uris) {
-    throw new UnsupportedOperationException();
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? udfManager.createFunction(udfName, className, uris)
+        : status;
   }
 
   public ProcedureManager getProcedureManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
similarity index 50%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index d0fd464f4f..b5fbc250aa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -16,46 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.consensus.request;
 
-public enum ConfigRequestType {
-  RegisterDataNode,
-  GetDataNodeInfo,
-  SetStorageGroup,
-  SetTTL,
-  SetSchemaReplicationFactor,
-  SetDataReplicationFactor,
-  SetTimePartitionInterval,
-  DeleteStorageGroup,
-  GetStorageGroup,
-  CountStorageGroup,
-  CreateRegions,
-  DeleteRegions,
-  GetSchemaPartition,
-  CreateSchemaPartition,
-  GetOrCreateSchemaPartition,
-  GetDataPartition,
-  CreateDataPartition,
-  GetOrCreateDataPartition,
-  UpdateProcedure,
-  DeleteProcedure,
-  Author,
-  CreateUser,
-  CreateRole,
-  DropUser,
-  DropRole,
-  GrantRole,
-  GrantUser,
-  GrantRoleToUser,
-  RevokeUser,
-  RevokeRole,
-  RevokeRoleFromUser,
-  UpdateUser,
-  ListUser,
-  ListRole,
-  ListUserPrivilege,
-  ListRolePrivilege,
-  ListUserRoles,
-  ListRoleUsers,
-  ApplyConfigNode
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
+
+import java.util.List;
+
+public class UDFManager {
+
+  private final ConfigManager configManager;
+  private final UDFInfo udfInfo;
+
+  public UDFManager(ConfigManager configManager, UDFInfo udfInfo) {
+    this.configManager = configManager;
+    this.udfInfo = udfInfo;
+  }
+
+  public TSStatus createFunction(String udfName, String className, List<String> uris) {
+    return new TSStatus();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
similarity index 50%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
index d0fd464f4f..4ba470d7c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -16,46 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.consensus.request;
-
-public enum ConfigRequestType {
-  RegisterDataNode,
-  GetDataNodeInfo,
-  SetStorageGroup,
-  SetTTL,
-  SetSchemaReplicationFactor,
-  SetDataReplicationFactor,
-  SetTimePartitionInterval,
-  DeleteStorageGroup,
-  GetStorageGroup,
-  CountStorageGroup,
-  CreateRegions,
-  DeleteRegions,
-  GetSchemaPartition,
-  CreateSchemaPartition,
-  GetOrCreateSchemaPartition,
-  GetDataPartition,
-  CreateDataPartition,
-  GetOrCreateDataPartition,
-  UpdateProcedure,
-  DeleteProcedure,
-  Author,
-  CreateUser,
-  CreateRole,
-  DropUser,
-  DropRole,
-  GrantRole,
-  GrantUser,
-  GrantRoleToUser,
-  RevokeUser,
-  RevokeRole,
-  RevokeRoleFromUser,
-  UpdateUser,
-  ListUser,
-  ListRole,
-  ListUserPrivilege,
-  ListRolePrivilege,
-  ListUserRoles,
-  ListRoleUsers,
-  ApplyConfigNode
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+
+public class UDFInfo implements SnapshotProcessor {
+
+  public TSStatus createFunction(CreateFunctionReq req) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
+    return false;
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws TException, IOException {}
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index 4b7bc5d7ce..42be0cd33f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -48,6 +49,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.persistence.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -75,17 +77,21 @@ public class ConfigRequestExecutor {
 
   private final ProcedureInfo procedureInfo;
 
+  private final UDFInfo udfInfo;
+
   public ConfigRequestExecutor(
       NodeInfo nodeInfo,
       ClusterSchemaInfo clusterSchemaInfo,
       PartitionInfo partitionInfo,
       AuthorInfo authorInfo,
-      ProcedureInfo procedureInfo) {
+      ProcedureInfo procedureInfo,
+      UDFInfo udfInfo) {
     this.nodeInfo = nodeInfo;
     this.clusterSchemaInfo = clusterSchemaInfo;
     this.partitionInfo = partitionInfo;
     this.authorInfo = authorInfo;
     this.procedureInfo = procedureInfo;
+    this.udfInfo = udfInfo;
   }
 
   public DataSet executorQueryPlan(ConfigRequest req)
@@ -168,6 +174,8 @@ public class ConfigRequestExecutor {
         return authorInfo.authorNonQuery((AuthorReq) req);
       case ApplyConfigNode:
         return nodeInfo.updateConfigNodeList((ApplyConfigNodeReq) req);
+      case CreateFunction:
+        return udfInfo.createFunction((CreateFunctionReq) req);
       default:
         throw new UnknownPhysicalPlanTypeException(req.getType());
     }
@@ -197,8 +205,7 @@ public class ConfigRequestExecutor {
     }
 
     AtomicBoolean result = new AtomicBoolean(true);
-    getAllAttributes()
-        .parallelStream()
+    getAllAttributes().parallelStream()
         .forEach(
             x -> {
               boolean takeSnapshotResult = true;
@@ -227,8 +234,7 @@ public class ConfigRequestExecutor {
       return;
     }
 
-    getAllAttributes()
-        .parallelStream()
+    getAllAttributes().parallelStream()
         .forEach(
             x -> {
               try {
@@ -244,6 +250,7 @@ public class ConfigRequestExecutor {
     allAttributes.add(clusterSchemaInfo);
     allAttributes.add(partitionInfo);
     allAttributes.add(nodeInfo);
+    allAttributes.add(udfInfo);
     return allAttributes;
   }
 }