You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/18 07:20:20 UTC

[iotdb] branch stable-mpp created (now d805e3d3bd)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at d805e3d3bd Combine mpp_port with internal_port

This branch includes the following new commits:

     new d805e3d3bd Combine mpp_port with internal_port

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Combine mpp_port with internal_port

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d805e3d3bdcd29a4bfef63ab964b575faa065216
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Apr 18 15:20:00 2022 +0800

    Combine mpp_port with internal_port
---
 .../iotdb/confignode/cli/TemporaryClient.java      |  10 +-
 .../resources/conf/iotdb-engine.properties         |   3 -
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 --
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 -
 .../apache/iotdb/db/mpp/execution/Coordinator.java |   2 +-
 .../scheduler/AbstractFragInsStateTracker.java     |   5 +-
 .../scheduler/InternalServiceClientFactory.java    |   7 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   5 +-
 .../execution/scheduler/SimpleQueryTerminator.java |   5 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   6 +-
 .../org/apache/iotdb/db/service/DataNodeMBean.java |  21 ----
 .../iotdb/db/service/DataNodeManagementServer.java | 102 -------------------
 .../db/service/DataNodeManagementServerMBean.java  |  22 ----
 .../apache/iotdb/db/service/InternalService.java   |   4 +-
 .../iotdb/db/service/InternalServiceImpl.java      | 112 ---------------------
 .../handler/DataNodeManagementServiceHandler.java  |  58 -----------
 .../handler}/InternalServiceThriftHandler.java     |   2 +-
 ...ntServiceImpl.java => InternalServiceImpl.java} |  84 ++++++++++++++--
 .../iotdb/db/service/InternalServiceImplTest.java  |   1 +
 thrift/src/main/thrift/management.thrift           |  77 --------------
 thrift/src/main/thrift/mpp.thrift                  |  58 ++++++++++-
 21 files changed, 158 insertions(+), 441 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
index 787d07b579..052abbb8f1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
+import org.apache.iotdb.mpp.rpc.thrift.CreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.CreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.CreateDataRegionReq;
-import org.apache.iotdb.service.rpc.thrift.CreateSchemaRegionReq;
-import org.apache.iotdb.service.rpc.thrift.ManagementIService;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -53,7 +53,7 @@ public class TemporaryClient {
   private static final int retryNum = 3;
 
   // Map<DataNodeId, ManagementIService.Client>
-  private final Map<Integer, ManagementIService.Client> clients;
+  private final Map<Integer, InternalService.Iface> clients;
 
   private TemporaryClient() {
     this.clients = new HashMap<>();
@@ -66,7 +66,7 @@ public class TemporaryClient {
             RpcTransportFactory.INSTANCE.getTransport(
                 endpoint.getIp(), endpoint.getPort(), timeOutInMS);
         transport.open();
-        clients.put(dataNodeId, new ManagementIService.Client(new TBinaryProtocol(transport)));
+        clients.put(dataNodeId, new InternalService.Client(new TBinaryProtocol(transport)));
         LOGGER.info("Build client to DataNode: {} success", endpoint);
         return;
       } catch (TTransportException e) {
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index f66262d72c..76f0816963 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -45,9 +45,6 @@ rpc_port=6667
 # Datatype: int
 # data_block_manager_keep_alive_time_in_ms=1000
 
-# Datatype: int
-# mpp_port=7777
-
 # Datatype: String
 # used for communication between cluster nodes.
 # if this parameter is commented, then the IP that binded by the hostname will be used.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9540120af5..57d7371a20 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -109,9 +109,6 @@ public class IoTDBConfig {
   /** Port which the JDBC server listens to. */
   private int rpcPort = 6667;
 
-  /** Port which is used for node communication in MPP. */
-  private int mppPort = 7777;
-
   /** Port which the influxdb protocol server listens to. */
   private int influxDBRpcPort = 8086;
 
@@ -2740,14 +2737,6 @@ public class IoTDBConfig {
     this.seriesPartitionSlotNum = seriesPartitionSlotNum;
   }
 
-  public int getMppPort() {
-    return mppPort;
-  }
-
-  public void setMppPort(int mppPort) {
-    this.mppPort = mppPort;
-  }
-
   public int getDataBlockManagerPort() {
     return dataBlockManagerPort;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c11b14ccab..774990e978 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -171,10 +171,6 @@ public class IoTDBDescriptor {
           Boolean.parseBoolean(
               properties.getProperty("mpp_mode", Boolean.toString(conf.isMppMode()))));
 
-      conf.setMppPort(
-          Integer.parseInt(
-              properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
-
       conf.setEnableInfluxDBRpcService(
           Boolean.parseBoolean(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 51f3a6d6f3..1f97f3c93c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -51,7 +51,7 @@ public class Coordinator {
   private static final Endpoint LOCAL_HOST =
       new Endpoint(
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
-          IoTDBDescriptor.getInstance().getConfig().getMppPort());
+          IoTDBDescriptor.getInstance().getConfig().getInternalPort());
 
   private final ExecutorService executor;
   private final ScheduledExecutorService scheduledExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 0e59d80855..82abc70c3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -58,11 +58,12 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   public abstract void abort();
 
   protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
+    // TODO (jackie tien) change the port
     InternalService.Iface client =
-        InternalServiceClientFactory.getMppServiceClient(
+        InternalServiceClientFactory.getInternalServiceClient(
             new Endpoint(
                 instance.getHostEndpoint().getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getMppPort()));
+                IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
     TFragmentInstanceStateResp resp =
         client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
     return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index ce60581900..c1c76f1e07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.SynchronizedHandler;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -45,12 +44,12 @@ public class InternalServiceClientFactory {
   private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
 
   // TODO need to be replaced by mature client pool in the future
-  private static final Map<Endpoint, InternalService.Iface> mppServiceClientMap =
+  private static final Map<Endpoint, InternalService.Iface> internalServiceClientMap =
       new ConcurrentHashMap<>();
 
-  public static InternalService.Iface getMppServiceClient(Endpoint endpoint)
+  public static InternalService.Iface getInternalServiceClient(Endpoint endpoint)
       throws TTransportException {
-    return mppServiceClientMap.computeIfAbsent(
+    return internalServiceClientMap.computeIfAbsent(
         endpoint,
         address -> {
           TTransport transport;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index bdece41164..17a193d9b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -48,11 +48,12 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
           try {
             for (FragmentInstance instance : instances) {
+              // TODO: (jackie tien) change the port
               InternalService.Iface client =
-                  InternalServiceClientFactory.getMppServiceClient(
+                  InternalServiceClientFactory.getInternalServiceClient(
                       new Endpoint(
                           instance.getHostEndpoint().getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getMppPort()));
+                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
               // TODO: (xingtanzjr) consider how to handle the buffer here
               ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
               instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index ca5e1f2230..728966daea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -55,11 +55,12 @@ public class SimpleQueryTerminator implements IQueryTerminator {
             () -> {
               try {
                 for (Endpoint endpoint : relatedHost) {
+                  // TODO (jackie tien) change the port
                   InternalService.Iface client =
-                      InternalServiceClientFactory.getMppServiceClient(
+                      InternalServiceClientFactory.getInternalServiceClient(
                           new Endpoint(
                               endpoint.getIp(),
-                              IoTDBDescriptor.getInstance().getConfig().getMppPort()));
+                              IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
                   client.cancelQuery(new TCancelQueryReq(queryId.getId()));
                 }
               } catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index e7fdc7290e..68c3aec142 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConfigCheck;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
-import org.apache.iotdb.db.service.thrift.impl.DataNodeManagementServiceImpl;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class DataNode implements DataNodeMBean {
+public class DataNode {
   private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
 
   private final String mbeanName =
@@ -171,9 +170,6 @@ public class DataNode implements DataNodeMBean {
     /** Register services */
     JMXService.registerMBean(getInstance(), mbeanName);
     // TODO: move rpc service initialization from iotdb instance here
-    DataNodeManagementServiceImpl dataNodeInternalServiceImpl = new DataNodeManagementServiceImpl();
-    DataNodeManagementServer.getInstance().initSyncedServiceImpl(dataNodeInternalServiceImpl);
-    registerManager.register(DataNodeManagementServer.getInstance());
     // init influxDB MManager
     if (IoTDBDescriptor.getInstance().getConfig().isEnableInfluxDBRpcService()) {
       IoTDB.initInfluxDBMManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeMBean.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeMBean.java
deleted file mode 100644
index 03335b3527..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeMBean.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service;
-
-public interface DataNodeMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
deleted file mode 100644
index 6c52ac4066..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.service;
-
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.service.ThriftService;
-import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.service.thrift.handler.DataNodeManagementServiceHandler;
-import org.apache.iotdb.db.service.thrift.impl.DataNodeManagementServiceImpl;
-import org.apache.iotdb.service.rpc.thrift.ManagementIService;
-
-public class DataNodeManagementServer extends ThriftService
-    implements DataNodeManagementServerMBean {
-
-  private DataNodeManagementServiceImpl impl;
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.DATA_NODE_MANAGEMENT_SERVICE;
-  }
-
-  @Override
-  public ThriftService getImplementation() {
-    return DataNodeInternalServerHolder.INSTANCE;
-  }
-
-  @Override
-  public void initSyncedServiceImpl(Object serviceImpl) {
-    impl = (DataNodeManagementServiceImpl) serviceImpl;
-    super.initSyncedServiceImpl(serviceImpl);
-  }
-
-  @Override
-  public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-    processor = new ManagementIService.Processor<>(impl);
-  }
-
-  @Override
-  public void initThriftServiceThread()
-      throws IllegalAccessException, InstantiationException, ClassNotFoundException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    try {
-      thriftServiceThread =
-          new ThriftServiceThread(
-              processor,
-              getID().getName(),
-              ThreadName.DATA_NODE_MANAGEMENT_RPC_CLIENT.getName(),
-              getBindIP(),
-              getBindPort(),
-              config.getRpcMaxConcurrentClientNum(),
-              config.getThriftServerAwaitTimeForStopService(),
-              new DataNodeManagementServiceHandler(impl),
-              IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
-    } catch (RPCServiceException e) {
-      throw new IllegalAccessException(e.getMessage());
-    }
-    thriftServiceThread.setName(ThreadName.DATA_NODE_MANAGEMENT_RPC_SERVER.getName());
-  }
-
-  @Override
-  public String getBindIP() {
-    return IoTDBDescriptor.getInstance().getConfig().getInternalIp();
-  }
-
-  @Override
-  public int getBindPort() {
-    return IoTDBDescriptor.getInstance().getConfig().getInternalPort();
-  }
-
-  public static DataNodeManagementServer getInstance() {
-    return DataNodeInternalServerHolder.INSTANCE;
-  }
-
-  private static class DataNodeInternalServerHolder {
-
-    private static final DataNodeManagementServer INSTANCE = new DataNodeManagementServer();
-
-    private DataNodeInternalServerHolder() {}
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServerMBean.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServerMBean.java
deleted file mode 100644
index e87abeab77..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServerMBean.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.service;
-
-public interface DataNodeManagementServerMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
index 5236d2bf87..fbe6c4634e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.thrift.handler.InternalServiceThriftHandler;
+import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService.Processor;
 
 public class InternalService extends ThriftService implements InternalServiceMBean {
@@ -82,7 +84,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
 
   @Override
   public int getBindPort() {
-    return IoTDBDescriptor.getInstance().getConfig().getMppPort();
+    return IoTDBDescriptor.getInstance().getConfig().getInternalPort();
   }
 
   private static class InternalServiceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
deleted file mode 100644
index c3c398bbf5..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.service;
-
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.GroupType;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.db.consensus.ConsensusImpl;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
-import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
-import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
-import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
-
-import org.apache.thrift.TException;
-
-public class InternalServiceImpl implements InternalService.Iface {
-
-  public InternalServiceImpl() {
-    super();
-  }
-
-  @Override
-  public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
-    QueryType type = QueryType.valueOf(req.queryType);
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.create(
-            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
-    switch (type) {
-      case READ:
-        ConsensusReadResponse readResp =
-            ConsensusImpl.getInstance()
-                .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
-        FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
-        return new TSendFragmentInstanceResp(!info.getState().isFailed());
-      case WRITE:
-        TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
-        ConsensusWriteResponse resp =
-            ConsensusImpl.getInstance()
-                .write(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
-        // TODO need consider more status
-        response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
-        response.setMessage(resp.getStatus().message);
-        return response;
-    }
-    return null;
-  }
-
-  @Override
-  public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
-    FragmentInstanceInfo info =
-        FragmentInstanceManager.getInstance()
-            .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
-    return new TFragmentInstanceStateResp(info.getState().toString());
-  }
-
-  @Override
-  public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-
-    // TODO need to be implemented and currently in order not to print NotImplementedException log,
-    // we simply return null
-    return null;
-    //    throw new NotImplementedException();
-  }
-
-  @Override
-  public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws TException {
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) throws TException {
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public SchemaFetchResponse fetchSchema(SchemaFetchRequest req) throws TException {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/DataNodeManagementServiceHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/DataNodeManagementServiceHandler.java
deleted file mode 100644
index e6054f6ade..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/DataNodeManagementServiceHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service.thrift.handler;
-
-import org.apache.iotdb.db.service.thrift.impl.DataNodeManagementServiceImpl;
-
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class DataNodeManagementServiceHandler implements TServerEventHandler {
-  private DataNodeManagementServiceImpl serviceImpl;
-
-  public DataNodeManagementServiceHandler(DataNodeManagementServiceImpl serviceImpl) {
-    this.serviceImpl = serviceImpl;
-  }
-
-  @Override
-  public void preServe() {
-    // nothing
-  }
-
-  @Override
-  public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
-    // nothing
-    return null;
-  }
-
-  @Override
-  public void deleteContext(
-      ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
-    // release query resources.
-    serviceImpl.handleClientExit();
-  }
-
-  @Override
-  public void processContext(
-      ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
-    // nothing
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InternalServiceThriftHandler.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
rename to server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InternalServiceThriftHandler.java
index bfaf11c583..72c81e48d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceThriftHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/InternalServiceThriftHandler.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.service;
+package org.apache.iotdb.db.service.thrift.handler;
 
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.ServerContext;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
similarity index 66%
rename from server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
rename to server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 828bfe355f..bf5f4ab311 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -25,10 +25,14 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.DataRegionException;
@@ -36,12 +40,14 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.mpp.rpc.thrift.*;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.CreateDataRegionReq;
-import org.apache.iotdb.service.rpc.thrift.CreateSchemaRegionReq;
-import org.apache.iotdb.service.rpc.thrift.ManagementIService;
-import org.apache.iotdb.service.rpc.thrift.MigrateDataRegionReq;
-import org.apache.iotdb.service.rpc.thrift.MigrateSchemaRegionReq;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -51,12 +57,76 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManagementServiceImpl.class);
+public class InternalServiceImpl implements InternalService.Iface {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class);
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
   private final IConsensus consensusImpl = ConsensusImpl.getInstance();
 
+  public InternalServiceImpl() {
+    super();
+  }
+
+  @Override
+  public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
+    QueryType type = QueryType.valueOf(req.queryType);
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.create(
+            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+    switch (type) {
+      case READ:
+        ConsensusReadResponse readResp =
+            ConsensusImpl.getInstance()
+                .read(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
+        return new TSendFragmentInstanceResp(!info.getState().isFailed());
+      case WRITE:
+        TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
+        ConsensusWriteResponse resp =
+            ConsensusImpl.getInstance()
+                .write(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        // TODO need consider more status
+        response.setAccepted(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
+        response.setMessage(resp.getStatus().message);
+        return response;
+    }
+    return null;
+  }
+
+  @Override
+  public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
+    FragmentInstanceInfo info =
+        FragmentInstanceManager.getInstance()
+            .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
+    return new TFragmentInstanceStateResp(info.getState().toString());
+  }
+
+  @Override
+  public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
+
+    // TODO need to be implemented and currently in order not to print NotImplementedException log,
+    // we simply return null
+    return null;
+    //    throw new NotImplementedException();
+  }
+
+  @Override
+  public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws TException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) throws TException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public SchemaFetchResponse fetchSchema(SchemaFetchRequest req) throws TException {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public TSStatus createSchemaRegion(CreateSchemaRegionReq req) throws TException {
     TSStatus tsStatus;
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index d5ba4d1f07..3ffc8ddc55 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
diff --git a/thrift/src/main/thrift/management.thrift b/thrift/src/main/thrift/management.thrift
deleted file mode 100644
index eaaad8d18c..0000000000
--- a/thrift/src/main/thrift/management.thrift
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-include "common.thrift"
-namespace java org.apache.iotdb.service.rpc.thrift
-
-typedef i32 int
-typedef i64 long
-
-struct CreateSchemaRegionReq {
-    1: required common.TRegionReplicaSet regionReplicaSet
-    2: required string storageGroup
-}
-
-struct CreateDataRegionReq {
-    1: required common.TRegionReplicaSet regionReplicaSet
-    2: required string storageGroup
-    3: optional long ttl
-}
-
-struct MigrateSchemaRegionReq{
-    1: required int sourceDataNodeID
-    2: required int targetDataNodeID
-    3: required int schemaRegionID
-}
-struct MigrateDataRegionReq{
-    1: required int sourceDataNodeID
-    2: required int targetDataNodeID
-    3: required int dataRegionID
-}
-
-service ManagementIService {
-    /**
-      * Config node will create a schema region on a list of data nodes.
-      *
-      * @param data nodes of the schema region, and schema region id generated by config node
-    **/
-    common.TSStatus createSchemaRegion(CreateSchemaRegionReq req)
-
-    /**
-      * Config node will create a data region on a list of data nodes.
-      *
-      * @param data nodes of the data region, and data region id generated by config node
-    **/
-    common.TSStatus createDataRegion(CreateDataRegionReq req)
-
-    /**
-      * Config node will migrate a schema region from one data node to another
-      *
-      * @param previous data node in the schema region, new data node, and schema region id
-    **/
-    common.TSStatus migrateSchemaRegion(MigrateSchemaRegionReq req)
-
-    /**
-      * Config node will migrate a data region from one data node to another
-      *
-      * @param previous data node in the data region, new data node, and dataregion id
-    **/
-    common.TSStatus migrateDataRegion(MigrateDataRegionReq req)
-
-}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index e3be3e324b..5251210ac8 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -16,9 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
+include "common.thrift"
 namespace java org.apache.iotdb.mpp.rpc.thrift
 
+struct CreateSchemaRegionReq {
+    1: required common.TRegionReplicaSet regionReplicaSet
+    2: required string storageGroup
+}
+
+struct CreateDataRegionReq {
+    1: required common.TRegionReplicaSet regionReplicaSet
+    2: required string storageGroup
+    3: optional i64 ttl
+}
+
+struct MigrateSchemaRegionReq{
+    1: required i32 sourceDataNodeID
+    2: required i32 targetDataNodeID
+    3: required i32 schemaRegionID
+}
+
+struct MigrateDataRegionReq{
+    1: required i32 sourceDataNodeID
+    2: required i32 targetDataNodeID
+    3: required i32 dataRegionID
+}
 
 struct TFragmentInstanceId {
   1: required string queryId
@@ -113,6 +135,9 @@ struct SchemaFetchResponse {
 }
 
 service InternalService {
+
+    // -----------------------------------For Config Node-----------------------------------------------
+
     TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req);
 
     TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req);
@@ -124,6 +149,37 @@ service InternalService {
     TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req);
 
     SchemaFetchResponse fetchSchema(SchemaFetchRequest req)
+
+
+    // -----------------------------------For Config Node-----------------------------------------------
+
+    /**
+      * Config node will create a schema region on a list of data nodes.
+      *
+      * @param data nodes of the schema region, and schema region id generated by config node
+    **/
+    common.TSStatus createSchemaRegion(CreateSchemaRegionReq req)
+
+    /**
+      * Config node will create a data region on a list of data nodes.
+      *
+      * @param data nodes of the data region, and data region id generated by config node
+    **/
+    common.TSStatus createDataRegion(CreateDataRegionReq req)
+
+    /**
+      * Config node will migrate a schema region from one data node to another
+      *
+      * @param previous data node in the schema region, new data node, and schema region id
+    **/
+    common.TSStatus migrateSchemaRegion(MigrateSchemaRegionReq req)
+
+    /**
+      * Config node will migrate a data region from one data node to another
+      *
+      * @param previous data node in the data region, new data node, and dataregion id
+    **/
+    common.TSStatus migrateDataRegion(MigrateDataRegionReq req)
 }
 
 service DataBlockService {