You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/04/21 02:26:10 UTC

[incubator-doris] branch master updated: [Refactor] Remove jprotobuf and use grpc client to connect brpc service (#5650)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b121ad6  [Refactor] Remove jprotobuf and use grpc client to connect brpc service (#5650)
b121ad6 is described below

commit b121ad6b953aaa29ad38491a435f09e049c40d3f
Author: Zhengguo Yang <78...@qq.com>
AuthorDate: Wed Apr 21 10:25:58 2021 +0800

    [Refactor] Remove jprotobuf and use grpc client to connect brpc service (#5650)
---
 be/src/runtime/buffer_control_block.cpp            |  28 ++-
 be/src/runtime/buffer_control_block.h              |  10 +-
 be/src/service/internal_service.cpp                |  14 +-
 be/src/service/internal_service.h                  |   2 +-
 fe/fe-core/pom.xml                                 | 102 ++++-----
 .../main/java/org/apache/doris/common/Config.java  |  14 +-
 .../main/java/org/apache/doris/common/Status.java  |   8 +-
 .../common/proc/CurrentQueryInfoProvider.java      |  38 ++--
 .../org/apache/doris/common/util/DebugUtil.java    |   6 +-
 .../org/apache/doris/common/util/KafkaUtil.java    |  50 ++---
 .../org/apache/doris/planner/OlapScanNode.java     |   8 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |  14 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 105 +++++-----
 .../java/org/apache/doris/qe/ResultReceiver.java   |  44 ++--
 .../main/java/org/apache/doris/qe/RowBatch.java    |   2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  63 +++---
 .../main/java/org/apache/doris/qe/cache/Cache.java |   3 +-
 .../org/apache/doris/qe/cache/CacheAnalyzer.java   |  19 +-
 .../org/apache/doris/qe/cache/CacheBeProxy.java    |  72 +++----
 .../apache/doris/qe/cache/CacheCoordinator.java    |  20 +-
 .../java/org/apache/doris/qe/cache/CacheProxy.java | 228 +--------------------
 .../org/apache/doris/qe/cache/PartitionCache.java  |  50 +++--
 .../org/apache/doris/qe/cache/RowBatchBuilder.java |  47 +++--
 .../java/org/apache/doris/qe/cache/SqlCache.java   |  35 +++-
 .../org/apache/doris/rpc/BackendServiceClient.java |  84 ++++++++
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 171 ++++++----------
 .../java/org/apache/doris/rpc/PBackendService.java |  67 ------
 .../apache/doris/rpc/PExecPlanFragmentRequest.java |  24 ---
 .../org/apache/doris/rpc/PFetchDataRequest.java    |  33 ---
 .../doris/rpc/PTriggerProfileReportRequest.java    |  42 ----
 .../doris/rpc/ThriftClientAttachmentHandler.java   |  33 ---
 .../org/apache/doris/planner/QueryPlanTest.java    |   5 +-
 .../org/apache/doris/qe/ConnectProcessorTest.java  |   9 +-
 .../org/apache/doris/qe/PartitionCacheTest.java    |  11 +-
 .../org/apache/doris/utframe/MockedBackend.java    |  26 +--
 .../apache/doris/utframe/MockedBackendFactory.java | 140 +++++++------
 fe/pom.xml                                         |  54 +++--
 gensrc/proto/internal_service.proto                |   5 +
 38 files changed, 683 insertions(+), 1003 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp
index 9d21999..3846fa5 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -45,16 +45,28 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics
 }
 
 void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos) {
-    uint8_t* buf = nullptr;
-    uint32_t len = 0;
-    ThriftSerializer ser(false, 4096);
-    auto st = ser.serialize(&t_result->result_batch, &len, &buf);
-    if (st.ok()) {
-        cntl->response_attachment().append(buf, len);
+    Status st = Status::OK();
+    if (t_result != nullptr) {
+        uint8_t* buf = nullptr;
+        uint32_t len = 0;
+        ThriftSerializer ser(false, 4096);
+        st = ser.serialize(&t_result->result_batch, &len, &buf);
+        if (st.ok()) {
+            if (resp_in_attachment) {
+                // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
+                cntl->response_attachment().append(buf, len);
+            } else {
+                result->set_row_batch(std::string((const char*)buf, len));
+            }
+            result->set_packet_seq(packet_seq);
+            result->set_eos(eos);
+        } else {
+            LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st.get_error_msg();
+        }
+    } else {
+        result->set_empty_batch(true);
         result->set_packet_seq(packet_seq);
         result->set_eos(eos);
-    } else {
-        LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st.get_error_msg();
     }
     st.to_protobuf(result->mutable_status());
     done->Run();
diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h
index 517ae0b..f3e12e2 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -44,12 +44,18 @@ class PFetchDataResult;
 
 struct GetResultBatchCtx {
     brpc::Controller* cntl = nullptr;
+    // In version 0.15, we change brpc client from jprotobuf to grpc.
+    // And the response data is moved from rpc attachment to protobuf boby.
+    // This variables is for backwards compatibility when upgrading Doris.
+    // If set to true, the response data is still transferred as attachment.
+    // If set to false, the response data is transferred in protobuf body.
+    bool resp_in_attachment = true;
     PFetchDataResult* result = nullptr;
     google::protobuf::Closure* done = nullptr;
 
-    GetResultBatchCtx(brpc::Controller* cntl_, PFetchDataResult* result_,
+    GetResultBatchCtx(brpc::Controller* cntl_, bool resp_in_attachment_, PFetchDataResult* result_,
                       google::protobuf::Closure* done_)
-            : cntl(cntl_), result(result_), done(done_) {}
+            : cntl(cntl_), resp_in_attachment(resp_in_attachment_), result(result_), done(done_) {}
 
     void on_failure(const Status& status);
     void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 51480fb..a0c4bc9 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -77,7 +77,13 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
                                                  google::protobuf::Closure* done) {
     brpc::ClosureGuard closure_guard(done);
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    auto st = _exec_plan_fragment(cntl);
+    auto st = Status::OK();
+    if (request->has_request()) {
+        st = _exec_plan_fragment(request->request());
+    } else {
+        // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
+        st = _exec_plan_fragment(cntl->request_attachment().to_string());
+    }
     if (!st.ok()) {
         LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
     }
@@ -129,8 +135,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
 }
 
 template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
-    auto ser_request = cntl->request_attachment().to_string();
+Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_request) {
     TExecPlanFragmentParams t_request;
     {
         const uint8_t* buf = (const uint8_t*)ser_request.data();
@@ -172,7 +177,8 @@ void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_b
                                          const PFetchDataRequest* request, PFetchDataResult* result,
                                          google::protobuf::Closure* done) {
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+    bool resp_in_attachment = request->has_resp_in_attachment() ? request->resp_in_attachment() : true;
+    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, resp_in_attachment, result, done);
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
 }
 
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 1010f14..91196ae 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -89,7 +89,7 @@ public:
                      PCacheResponse* response, google::protobuf::Closure* done) override;
 
 private:
-    Status _exec_plan_fragment(brpc::Controller* cntl);
+    Status _exec_plan_fragment(const std::string& s_request);
 
 private:
     ExecEnv* _exec_env;
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 9a91ab2..0711f55 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -229,36 +229,11 @@ under the License.
             <artifactId>joda-time</artifactId>
         </dependency>
 
-        <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf -->
-        <dependency>
-            <groupId>com.baidu</groupId>
-            <artifactId>jprotobuf</artifactId>
-            <classifier>jar-with-dependencies</classifier>
-        </dependency>
-
-        <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-common -->
-        <dependency>
-            <groupId>com.baidu</groupId>
-            <artifactId>jprotobuf-rpc-common</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
 
-        <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-core -->
-        <dependency>
-            <groupId>com.baidu</groupId>
-            <artifactId>jprotobuf-rpc-core</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <!-- https://mvnrepository.com/artifact/org.json/json -->
         <dependency>
             <groupId>org.json</groupId>
@@ -607,6 +582,20 @@ under the License.
             <artifactId>tree-printer</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
@@ -674,28 +663,46 @@ under the License.
                 </configuration>
             </plugin>
 
-            <!-- run make to generate Version and builtin -->
-            <!-- also parse the proto for FE -->
+
+            <!-- protobuf -->
             <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>exec-maven-plugin</artifactId>
-                <version>3.0.0</version>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>3.11.1</version>
                 <executions>
                     <execution>
-                        <id>make-dir</id>
                         <phase>generate-sources</phase>
                         <goals>
-                            <goal>exec</goal>
+                            <goal>run</goal>
                         </goals>
                         <configuration>
-                            <executable>mkdir</executable>
-                            <arguments>
-                                <argument>-p</argument>
-                                <argument>${basedir}/target/generated-sources/proto</argument>
-                            </arguments>
-                            <skip>${skip.plugin}</skip>
+                            <protocCommand>${doris.thirdparty}/installed/bin/protoc</protocCommand>
+                            <!-->You can use following protocArtifact instead of protocCommand, so that you don't need to install protobuf tools<-->
+                            <!--protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact-->
+                            <protocVersion>${protobuf.version}</protocVersion>
+                            <inputDirectories>
+                                <include>${doris.home}/gensrc/proto</include>
+                            </inputDirectories>
+                            <outputTargets>
+                                <outputTarget>
+                                    <type>java</type>
+                                </outputTarget>
+                                <outputTarget>
+                                    <type>grpc-java</type>
+                                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}</pluginArtifact>
+                                </outputTarget>
+                            </outputTargets>
                         </configuration>
                     </execution>
+                </executions>
+            </plugin>
+            <!-- run make to generate Version and builtin -->
+            <!-- also parse the proto for FE -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
                     <execution>
                         <id>gensrc</id>
                         <phase>generate-sources</phase>
@@ -711,24 +718,6 @@ under the License.
                             <skip>${skip.plugin}</skip>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>gen_proto</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <!-- DO NOT use goal 'java', it will terminate the VM after done -->
-                            <goal>exec</goal>
-                        </goals>
-                        <configuration>
-                            <executable>${java.home}/bin/java</executable>
-                            <arguments>
-                                <argument>-jar</argument>
-                                <argument>${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar</argument>
-                                <argument>--java_out=${basedir}/target/generated-sources/proto</argument>
-                                <argument>${doris.home}/gensrc/proto/internal_service.proto</argument>
-                            </arguments>
-                            <skip>${skip.plugin}</skip>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
 
@@ -748,7 +737,6 @@ under the License.
                             <sources>
                                 <!-- add arbitrary num of src dirs here -->
                                 <source>${basedir}/target/generated-sources/build/</source>
-                                <source>${basedir}/target/generated-sources/proto/</source>
                             </sources>
                         </configuration>
                     </execution>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index f949eec..3a42551 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -900,13 +900,6 @@ public class Config extends ConfigBase {
     // All frontends will get tablet stat from all backends at each interval
     @ConfField public static int tablet_stat_update_interval_second = 300;  // 5 min
 
-    // May be necessary to modify the following BRPC configurations in high concurrency scenarios. 
-    // The number of concurrent requests BRPC can processed
-    @ConfField public static int brpc_number_of_concurrent_requests_processed = 4096;
-
-    // BRPC idle wait time (ms)
-    @ConfField public static int brpc_idle_wait_max_time = 10000;
-    
     /**
      * if set to false, auth check will be disable, in case some goes wrong with the new privilege system. 
      */
@@ -1355,4 +1348,11 @@ public class Config extends ConfigBase {
      */
     @ConfField
     public static boolean enable_outfile_to_local = false;
+
+    /**
+     * Used to set the initial flow window size of the GRPC client channel, and also used to max message size.
+     * When the result set is large, you may need to increase this value.
+     */
+    @ConfField
+    public static int grpc_max_message_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 2fad775..7d6b7c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.common;
 
-import org.apache.doris.proto.PStatus;
+import org.apache.doris.proto.Status.PStatus;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
@@ -81,9 +81,9 @@ public class Status {
     }
 
     public void setPstatus(PStatus status) {
-        this.errorCode = TStatusCode.findByValue(status.status_code);
-        if (status.error_msgs != null && !status.error_msgs.isEmpty()) {
-            this.errorMsg = status.error_msgs.get(0);
+        this.errorCode = TStatusCode.findByValue(status.getStatusCode());
+        if (!status.getErrorMsgsList().isEmpty()) {
+            this.errorMsg = status.getErrorMsgs(0);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
index 46add4d..0dd2c8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -23,11 +23,10 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Counter;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.RuntimeProfile;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.qe.QueryStatisticsItem;
 import org.apache.doris.rpc.BackendServiceProxy;
-import org.apache.doris.rpc.PTriggerProfileReportRequest;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -203,9 +202,10 @@ public class CurrentQueryInfoProvider {
                 }
                 // specified query instance which will report.
                 if (!allQuery) {
-                    final PUniqueId pUId = new PUniqueId();
-                    pUId.hi = instanceInfo.getInstanceId().hi;
-                    pUId.lo = instanceInfo.getInstanceId().lo;
+                    final Types.PUniqueId pUId = Types.PUniqueId.newBuilder()
+                            .setHi(instanceInfo.getInstanceId().hi)
+                            .setLo(instanceInfo.getInstanceId().lo)
+                            .build();
                     request.addInstanceId(pUId);
                 }
             }
@@ -213,13 +213,13 @@ public class CurrentQueryInfoProvider {
         recvResponse(sendRequest(requests));
     }
 
-    private List<Pair<Request, Future<PTriggerProfileReportResult>>> sendRequest(
+    private List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> sendRequest(
             Map<TNetworkAddress, Request> requests) throws AnalysisException {
-        final List<Pair<Request, Future<PTriggerProfileReportResult>>> futures = Lists.newArrayList();
+        final List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures = Lists.newArrayList();
         for (TNetworkAddress address : requests.keySet()) {
             final Request request = requests.get(address);
-            final PTriggerProfileReportRequest pbRequest =
-                    new PTriggerProfileReportRequest(request.getInstanceIds());
+            final InternalService.PTriggerProfileReportRequest pbRequest = InternalService.PTriggerProfileReportRequest
+                    .newBuilder().addAllInstanceIds(request.getInstanceIds()).build();
             try {
                 futures.add(Pair.create(request, BackendServiceProxy.getInstance().
                         triggerProfileReportAsync(address, pbRequest)));
@@ -230,18 +230,18 @@ public class CurrentQueryInfoProvider {
         return futures;
     }
 
-    private void recvResponse(List<Pair<Request, Future<PTriggerProfileReportResult>>> futures)
+    private void recvResponse(List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures)
             throws AnalysisException {
         final String reasonPrefix = "Fail to receive result.";
-        for (Pair<Request, Future<PTriggerProfileReportResult>> pair : futures) {
+        for (Pair<Request, Future<InternalService.PTriggerProfileReportResult>> pair : futures) {
             try {
-                final PTriggerProfileReportResult result
+                final InternalService.PTriggerProfileReportResult result
                         = pair.second.get(2, TimeUnit.SECONDS);
-                final TStatusCode code = TStatusCode.findByValue(result.status.status_code);
+                final TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
                 if (code != TStatusCode.OK) {
                     String errMsg = "";
-                    if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) {
-                        errMsg = result.status.error_msgs.get(0);
+                    if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+                        errMsg = result.getStatus().getErrorMsgs(0);
                     }
                     throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress()
                             + " reason:" + errMsg);
@@ -347,7 +347,7 @@ public class CurrentQueryInfoProvider {
 
     private static class Request {
         private final TNetworkAddress address;
-        private final List<PUniqueId> instanceIds;
+        private final List<Types.PUniqueId> instanceIds;
 
         public Request(TNetworkAddress address) {
             this.address = address;
@@ -358,11 +358,11 @@ public class CurrentQueryInfoProvider {
             return address;
         }
 
-        public List<PUniqueId> getInstanceIds() {
+        public List<Types.PUniqueId> getInstanceIds() {
             return instanceIds;
         }
 
-        public void addInstanceId(PUniqueId instanceId) {
+        public void addInstanceId(Types.PUniqueId instanceId) {
             this.instanceIds.add(instanceId);
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
index 012a5ee..c8eca0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
@@ -18,7 +18,7 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.common.Pair;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.Types;
 import org.apache.doris.thrift.TUniqueId;
 
 import java.io.PrintWriter;
@@ -135,9 +135,9 @@ public class DebugUtil {
         return builder.toString();
     }
 
-    public static String printId(final PUniqueId id) {
+    public static String printId(final Types.PUniqueId id) {
         StringBuilder builder = new StringBuilder();
-        builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
+        builder.append(Long.toHexString(id.getHi())).append("-").append(Long.toHexString(id.getLo()));
         return builder.toString();
     }
     
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 6f75f8e..4ecb520 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -21,19 +21,13 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.proto.PKafkaLoadInfo;
-import org.apache.doris.proto.PKafkaMetaProxyRequest;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PStringPair;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
-import com.google.common.collect.Lists;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class KafkaUtil {
     private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
@@ -61,31 +56,30 @@ public class KafkaUtil {
             address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
             
             // create request
-            PKafkaLoadInfo kafkaLoadInfo = new PKafkaLoadInfo();
-            kafkaLoadInfo.brokers = brokerList;
-            kafkaLoadInfo.topic = topic;
-            for (Map.Entry<String, String> entry : convertedCustomProperties.entrySet()) {
-                PStringPair pair = new PStringPair();
-                pair.key = entry.getKey();
-                pair.val = entry.getValue();
-                if (kafkaLoadInfo.properties == null) {
-                    kafkaLoadInfo.properties = Lists.newArrayList();
-                }
-                kafkaLoadInfo.properties.add(pair);
-            }
-            PKafkaMetaProxyRequest kafkaRequest = new PKafkaMetaProxyRequest();
-            kafkaRequest.kafka_info = kafkaLoadInfo;
-            PProxyRequest request = new PProxyRequest();
-            request.kafka_meta_request = kafkaRequest;
+            InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+                    InternalService.PKafkaMetaProxyRequest.newBuilder()
+                            .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+                                    .setBrokers(brokerList)
+                                    .setTopic(topic)
+                                    .addAllProperties(
+                                            convertedCustomProperties.entrySet().stream().map(
+                                            e -> InternalService.PStringPair.newBuilder()
+                                                    .setKey(e.getKey())
+                                                    .setVal(e.getValue())
+                                                    .build()
+                                            ).collect(Collectors.toList())
+                                    )
+                            )
+            ).build();
             
             // get info
-            Future<PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
-            PProxyResult result = future.get(5, TimeUnit.SECONDS);
-            TStatusCode code = TStatusCode.findByValue(result.status.status_code);
+            Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
+            InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS);
+            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
-                throw new UserException("failed to get kafka partition info: " + result.status.error_msgs);
+                throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());
             } else {
-                return result.kafka_meta_result.partition_ids;
+                return result.getKafkaMetaResult().getPartitionIdsList();
             }
         } catch (Exception e) {
             LOG.warn("failed to get partitions.", e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 4d260ae..3ec9866 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -242,9 +242,13 @@ public class OlapScanNode extends ScanNode {
             this.selectedIndexId = selectedIndexId;
             setIsPreAggregation(isPreAggregation, reasonOfDisable);
             updateColumnType();
-            LOG.info("Using the new scan range info instead of the old one. {}, {}", situation ,scanRangeInfo);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Using the new scan range info instead of the old one. {}, {}", situation ,scanRangeInfo);
+            }
         } else {
-            LOG.warn("Using the old scan range info instead of the new one. {}, {}", situation, scanRangeInfo);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Using the old scan range info instead of the new one. {}, {}", situation, scanRangeInfo);
+            }
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 2934923..8a30f1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -44,7 +44,7 @@ import org.apache.doris.mysql.MysqlProto;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.mysql.MysqlServerStatusFlag;
 import org.apache.doris.plugin.AuditEvent.EventType;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
@@ -107,16 +107,16 @@ public class ConnectProcessor {
         ctx.getState().setOk();
     }
 
-    private void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStatistics statistics) {
+    private void auditAfterExec(String origStmt, StatementBase parsedStmt, Data.PQueryStatistics statistics) {
         // slow query
         long endTime = System.currentTimeMillis();
         long elapseMs = endTime - ctx.getStartTime();
         
         ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
             .setState(ctx.getState().toString()).setQueryTime(elapseMs)
-            .setScanBytes(statistics == null ? 0 : statistics.scan_bytes)
-            .setScanRows(statistics == null ? 0 : statistics.scan_rows)
-            .setCpuTimeMs(statistics == null ? 0 : statistics.cpu_ms)
+            .setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
+            .setScanRows(statistics == null ? 0 : statistics.getScanRows())
+            .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
             .setReturnRows(ctx.getReturnRows())
             .setStmtId(ctx.getStmtId())
             .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
@@ -181,7 +181,7 @@ public class ConnectProcessor {
 
         // execute this query.
         StatementBase parsedStmt = null;
-        List<Pair<StatementBase, PQueryStatistics>> auditInfoList = Lists.newArrayList();
+        List<Pair<StatementBase, Data.PQueryStatistics>> auditInfoList = Lists.newArrayList();
         boolean alreadyAddedToAuditInfoList = false;
         try {
             List<StatementBase> stmts = analyze(originStmt);
@@ -232,7 +232,7 @@ public class ConnectProcessor {
 
         // audit after exec
         if (!auditInfoList.isEmpty()) {
-            for (Pair<StatementBase, PQueryStatistics> audit : auditInfoList) {
+            for (Pair<StatementBase, Data.PQueryStatistics> audit : auditInfoList) {
                 auditAfterExec(originStmt.replace("\n", " "), audit.first, audit.second);
             }
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7b8e880..c36e6b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -50,9 +50,7 @@ import org.apache.doris.planner.ResultSink;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PPlanFragmentCancelReason;
-import org.apache.doris.proto.PStatus;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -80,11 +78,6 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TTabletCommitInfo;
 import org.apache.doris.thrift.TUniqueId;
 
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.HashMultiset;
@@ -94,6 +87,11 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -318,7 +316,7 @@ public class Coordinator {
         for (PlanFragment fragment : fragments) {
             fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment));
         }
-    
+
         // set inputFragments
         for (PlanFragment fragment : fragments) {
             if (!(fragment.getSink() instanceof DataStreamSink)) {
@@ -335,7 +333,7 @@ public class Coordinator {
         queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
 
         fragmentProfile = new ArrayList<RuntimeProfile>();
-        for (int i = 0; i < fragmentSize; i ++) {
+        for (int i = 0; i < fragmentSize; i++) {
             fragmentProfile.add(new RuntimeProfile("Fragment " + i));
             queryProfile.addChild(fragmentProfile.get(i));
         }
@@ -469,7 +467,8 @@ public class Coordinator {
                 int instanceNum = params.instanceExecParams.size();
                 Preconditions.checkState(instanceNum > 0);
                 List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx);
-                List<Pair<BackendExecState, Future<PExecPlanFragmentResult>>> futures = Lists.newArrayList();
+                List<Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>>> futures =
+                        Lists.newArrayList();
 
                 // update memory limit for colocate join
                 if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
@@ -511,16 +510,16 @@ public class Coordinator {
                     backendIdx++;
                 }
 
-                for (Pair<BackendExecState, Future<PExecPlanFragmentResult>> pair : futures) {
+                for (Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>> pair : futures) {
                     TStatusCode code;
                     String errMsg = null;
                     Exception exception = null;
                     try {
-                        PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
+                        InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
                                 TimeUnit.MILLISECONDS);
-                        code = TStatusCode.findByValue(result.status.status_code);
-                        if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) {
-                            errMsg = result.status.error_msgs.get(0);
+                        code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+                        if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+                            errMsg = result.getStatus().getErrorMsgsList().get(0);
                         }
                     } catch (ExecutionException e) {
                         LOG.warn("catch a execute exception", e);
@@ -548,7 +547,7 @@ public class Coordinator {
                         LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}",
                                 errMsg, code, fragment.getFragmentId(),
                                 pair.first.address.hostname, pair.first.address.port);
-                        cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
+                        cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
                         switch (code) {
                             case TIMEOUT:
                                 throw new UserException("query timeout. backend id: " + pair.first.backend.getId());
@@ -669,7 +668,7 @@ public class Coordinator {
             queryStatus.setStatus(status);
             LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}",
                     jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN");
-            cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
+            cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
         } finally {
             lock.unlock();
         }
@@ -720,13 +719,13 @@ public class Coordinator {
             this.returnedAllResults = true;
 
             // if this query is a block query do not cancel.
-            Long numLimitRows  = fragments.get(0).getPlanRoot().getLimit();
+            Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
             boolean hasLimit = numLimitRows > 0;
             if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
                 LOG.debug("no block query, return num >= limit rows, need cancel");
-                cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH);
+                cancelInternal(InternalService.PPlanFragmentCancelReason.LIMIT_REACH);
             }
-        } else {
+        } else if (resultBatch.getBatch() != null) {
             numReceivedRows += resultBatch.getBatch().getRowsSize();
         }
 
@@ -746,13 +745,13 @@ public class Coordinator {
                 queryStatus.setStatus(Status.CANCELLED);
             }
             LOG.warn("cancel execution of query, this is outside invoke");
-            cancelInternal(PPlanFragmentCancelReason.USER_CANCEL);
+            cancelInternal(InternalService.PPlanFragmentCancelReason.USER_CANCEL);
         } finally {
             unlock();
         }
     }
 
-    private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
+    private void cancelInternal(InternalService.PPlanFragmentCancelReason cancelReason) {
         if (null != receiver) {
             receiver.cancel();
         }
@@ -760,11 +759,11 @@ public class Coordinator {
         if (profileDoneSignal != null) {
             // count down to zero to notify all objects waiting for this
             profileDoneSignal.countDownToZero(new Status());
-            LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks().stream().map(e->DebugUtil.printId(e.getKey())).toArray());
+            LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks().stream().map(e -> DebugUtil.printId(e.getKey())).toArray());
         }
     }
 
-    private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) {
+    private void cancelRemoteFragmentsAsync(InternalService.PPlanFragmentCancelReason cancelReason) {
         for (BackendExecState backendExecState : backendExecStates) {
             backendExecState.cancelFragmentInstance(cancelReason);
         }
@@ -971,7 +970,7 @@ public class Coordinator {
                     throw new UserException("there is no scanNode Backend");
                 }
                 this.addressToBackendID.put(execHostport, backendIdRef.getRef());
-                FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport, 
+                FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport,
                         0, params);
                 params.instanceExecParams.add(instanceParam);
                 continue;
@@ -1018,7 +1017,7 @@ public class Coordinator {
                     // random select some instance
                     // get distinct host,  when parallel_fragment_exec_instance_num > 1, single host may execute several instances
                     Set<TNetworkAddress> hostSet = Sets.newHashSet();
-                    for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
+                    for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
                         hostSet.add(execParams.host);
                     }
                     List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
@@ -1028,7 +1027,7 @@ public class Coordinator {
                         params.instanceExecParams.add(instanceParam);
                     }
                 } else {
-                    for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
+                    for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
                         FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params);
                         params.instanceExecParams.add(instanceParam);
                     }
@@ -1125,7 +1124,7 @@ public class Coordinator {
 
         return false;
     }
-    
+
     // Returns the id of the leftmost node of any of the gives types in 'plan_root',
     // or INVALID_PLAN_NODE_ID if no such node present.
     private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) {
@@ -1206,7 +1205,6 @@ public class Coordinator {
             for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
                 FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
 
-
                 for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
                     instanceParam.bucketSeqSet.add(nodeScanRangeMap.first);
                     for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.second.entrySet()) {
@@ -1269,14 +1267,14 @@ public class Coordinator {
         }
         Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
         HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
-        for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+        for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
             //fill scanRangeParamsList
             List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
             if (!bucketSeqToAddress.containsKey(bucketSeq)) {
                 getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost);
             }
 
-            for(TScanRangeLocations location: locations) {
+            for (TScanRangeLocations location : locations) {
                 Map<Integer, List<TScanRangeParams>> scanRanges =
                         findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<Integer, List<TScanRangeParams>>());
 
@@ -1434,7 +1432,7 @@ public class Coordinator {
      * return true otherwise.
      * NOTICE: return true does not mean that coordinator executed success,
      * the caller should check queryStatus for result.
-     * 
+     *
      * We divide the entire waiting process into multiple rounds,
      * with a maximum of 30 seconds per round. And after each round of waiting,
      * check the status of the BE. If the BE status is abnormal, the wait is ended
@@ -1586,7 +1584,7 @@ public class Coordinator {
                 //buckendIdToBucketCountMap does not contain the new backend, insert into it
                 if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
                     buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
-                } else { //buckendIdToBucketCountMap contains the new backend, update it 
+                } else { //buckendIdToBucketCountMap contains the new backend, update it
                     buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
                 }
             } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly
@@ -1608,14 +1606,14 @@ public class Coordinator {
             Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
             BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId());
 
-            for (Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+            for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
                 //fill scanRangeParamsList
                 List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
                 if (!bucketSeqToAddress.containsKey(bucketSeq)) {
                     getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, idToBackend, addressToBackendID);
                 }
 
-                for(TScanRangeLocations location: locations) {
+                for (TScanRangeLocations location : locations) {
                     Map<Integer, List<TScanRangeParams>> scanRanges =
                             findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<Integer, List<TScanRangeParams>>());
 
@@ -1689,7 +1687,6 @@ public class Coordinator {
         }
     }
 
-
     private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
     private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
     // cache the fragment id to its scan node ids. Used for colocate join.
@@ -1711,7 +1708,7 @@ public class Coordinator {
         TNetworkAddress address;
         Backend backend;
         long lastMissingHeartbeatTime = -1;
-        
+
         public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId,
             TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID) {
             this.profileFragmentId = profileFragmentId;
@@ -1769,7 +1766,7 @@ public class Coordinator {
 
         // cancel the fragment instance.
         // return true if cancel success. Otherwise, return false
-        public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason cancelReason) {
+        public synchronized boolean cancelFragmentInstance(InternalService.PPlanFragmentCancelReason cancelReason) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}, fragment instance id={}, reason: {}",
                         this.initiated, this.done, this.hasCanceled, backend.getId(),
@@ -1822,7 +1819,7 @@ public class Coordinator {
             return true;
         }
 
-        public Future<PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
+        public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
             TNetworkAddress brpcAddress = null;
             try {
                 brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
@@ -1835,7 +1832,7 @@ public class Coordinator {
             } catch (RpcException e) {
                 // DO NOT throw exception here, return a complete future with error code,
                 // so that the following logic will cancel the fragment.
-                return new Future<PExecPlanFragmentResult>() {
+                return new Future<InternalService.PExecPlanFragmentResult>() {
                     @Override
                     public boolean cancel(boolean mayInterruptIfRunning) {
                         return false;
@@ -1852,19 +1849,19 @@ public class Coordinator {
                     }
 
                     @Override
-                    public PExecPlanFragmentResult get() {
-                        PExecPlanFragmentResult result = new PExecPlanFragmentResult();
-                        PStatus pStatus = new PStatus();
-                        pStatus.error_msgs = Lists.newArrayList();
-                        pStatus.error_msgs.add(e.getMessage());
-                        // use THRIFT_RPC_ERROR so that this BE will be added to the blacklist later.
-                        pStatus.status_code = TStatusCode.THRIFT_RPC_ERROR.getValue();
-                        result.status = pStatus;
+                    public InternalService.PExecPlanFragmentResult get() {
+                        InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult
+                                .newBuilder()
+                                .setStatus(org.apache.doris.proto.Status.PStatus.newBuilder()
+                                        .addErrorMsgs(e.getMessage())
+                                        .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue())
+                                        .build())
+                                .build();
                         return result;
                     }
 
                     @Override
-                    public PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
+                    public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
                         return get();
                     }
                 };
@@ -1887,8 +1884,8 @@ public class Coordinator {
     // used to assemble TPlanFragmentExecParas
     protected class FragmentExecParams {
         public PlanFragment fragment;
-        public List<TPlanFragmentDestination> destinations      = Lists.newArrayList();
-        public Map<Integer, Integer>          perExchNumSenders = Maps.newHashMap();
+        public List<TPlanFragmentDestination> destinations = Lists.newArrayList();
+        public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap();
 
         public List<PlanFragmentId> inputFragments = Lists.newArrayList();
         public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList();
@@ -1960,8 +1957,8 @@ public class Coordinator {
                 TEsScanRange esScanRange = range.getScanRange().getEsScanRange();
                 if (esScanRange != null) {
                     sb.append("{ index=").append(esScanRange.getIndex())
-                        .append(", shardid=").append(esScanRange.getShardId())
-                        .append("}");
+                            .append(", shardid=").append(esScanRange.getShardId())
+                            .append("}");
                 }
             }
             sb.append("]");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 9166543..7978ebc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -18,10 +18,9 @@
 package org.apache.doris.qe;
 
 import org.apache.doris.common.Status;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
-import org.apache.doris.rpc.PFetchDataRequest;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultBatch;
@@ -45,14 +44,12 @@ public class ResultReceiver {
     private long packetIdx = 0;
     private long timeoutTs = 0;
     private TNetworkAddress address;
-    private PUniqueId finstId;
+    private Types.PUniqueId finstId;
     private Long backendId;
     private Thread currentThread;
 
     public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) {
-        this.finstId = new PUniqueId();
-        this.finstId.hi = tid.hi;
-        this.finstId.lo = tid.lo;
+        this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         this.backendId = backendId;
         this.address = address;
         this.timeoutTs = System.currentTimeMillis() + timeoutMs;
@@ -65,11 +62,14 @@ public class ResultReceiver {
         final RowBatch rowBatch = new RowBatch();
         try {
             while (!isDone && !isCancel) {
-                PFetchDataRequest request = new PFetchDataRequest(finstId);
-
+                InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
+                        .setFinstId(finstId)
+                        .setRespInAttachment(false)
+                        .build();
+                
                 currentThread = Thread.currentThread();
-                Future<PFetchDataResult> future = BackendServiceProxy.getInstance().fetchDataAsync(address, request);
-                PFetchDataResult pResult = null;
+                Future<InternalService.PFetchDataResult> future = BackendServiceProxy.getInstance().fetchDataAsync(address, request);
+                InternalService.PFetchDataResult pResult = null;
                 while (pResult == null) {
                     long currentTs = System.currentTimeMillis();
                     if (currentTs >= timeoutTs) {
@@ -86,30 +86,34 @@ public class ResultReceiver {
                         }
                     }
                 }
-                TStatusCode code = TStatusCode.findByValue(pResult.status.status_code);
+                TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
                 if (code != TStatusCode.OK) {
-                    status.setPstatus(pResult.status);
+                    status.setPstatus(pResult.getStatus());
                     return null;
                 } 
  
-                rowBatch.setQueryStatistics(pResult.query_statistics);
+                rowBatch.setQueryStatistics(pResult.getQueryStatistics());
 
-                if (packetIdx != pResult.packet_seq) {
-                    LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.packet_seq);
+                if (packetIdx != pResult.getPacketSeq()) {
+                    LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.getPacketSeq());
                     status.setRpcStatus("receive error packet");
                     return null;
                 }
     
                 packetIdx++;
-                isDone = pResult.eos;
+                isDone = pResult.getEos();
 
-                byte[] serialResult = request.getSerializedResult();
-                if (serialResult != null && serialResult.length > 0) {
+                if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
+                    LOG.info("get first empty rowbatch");
+                    rowBatch.setEos(false);
+                    return rowBatch;
+                } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
+                    byte[] serialResult = pResult.getRowBatch().toByteArray();
                     TResultBatch resultBatch = new TResultBatch();
                     TDeserializer deserializer = new TDeserializer();
                     deserializer.deserialize(resultBatch, serialResult);
                     rowBatch.setBatch(resultBatch);
-                    rowBatch.setEos(pResult.eos);
+                    rowBatch.setEos(pResult.getEos());
                     return rowBatch;
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
index b6dfce9..087babb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data.PQueryStatistics;
 import org.apache.doris.thrift.TResultBatch;
 
 public final class RowBatch {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 4a361ec..0559773 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -70,31 +70,32 @@ import org.apache.doris.mysql.MysqlEofPacket;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.Planner;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
 import org.apache.doris.qe.cache.CacheAnalyzer;
 import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
-import org.apache.doris.qe.cache.CacheBeProxy;
-import org.apache.doris.qe.cache.CacheProxy;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TResultBatch;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionCommitFailedException;
 import org.apache.doris.transaction.TransactionStatus;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.glassfish.jersey.internal.guava.Sets;
-
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.glassfish.jersey.internal.guava.Sets;
 
 import java.io.IOException;
 import java.io.StringReader;
@@ -127,7 +128,7 @@ public class StmtExecutor {
     private Planner planner;
     private boolean isProxy;
     private ShowResultSet proxyResultSet = null;
-    private PQueryStatistics statisticsForAuditLog;
+    private Data.PQueryStatistics.Builder statisticsForAuditLog;
     private boolean isCached;
 
     private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
@@ -626,13 +627,21 @@ public class StmtExecutor {
     // return true if the meta fields has been sent, otherwise, return false.
     // the meta fields must be sent right before the first batch of data(or eos flag).
     // so if it has data(or eos is true), this method must return true.
-    private boolean sendCachedValues(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues,
+    private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCacheValue> cacheValues,
                                      SelectStmt selectStmt, boolean isSendFields, boolean isEos)
             throws Exception {
         RowBatch batch = null;
         boolean isSend = isSendFields;
-        for (CacheBeProxy.CacheValue value : cacheValues) {
-            batch = value.getRowBatch();
+        for (InternalService.PCacheValue value : cacheValues) {
+            TResultBatch resultBatch = new TResultBatch();
+            for (ByteString one : value.getRowsList()) {
+                resultBatch.addToRows(ByteBuffer.wrap(one.toByteArray()));
+            }
+            resultBatch.setPacketSeq(1);
+            resultBatch.setIsCompressed(false);
+            batch = new RowBatch();
+            batch.setBatch(resultBatch);
+            batch.setEos(true);
             if (!isSend) {
                 // send meta fields before sending first data batch.
                 sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
@@ -646,7 +655,7 @@ public class StmtExecutor {
 
         if (isEos) {
             if (batch != null) {
-                statisticsForAuditLog = batch.getQueryStatistics();
+                statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
             }
             if (!isSend) {
                 sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
@@ -662,20 +671,20 @@ public class StmtExecutor {
      */
     private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception {
         RowBatch batch = null;
-        CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
+        InternalService.PFetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
         CacheMode mode = cacheAnalyzer.getCacheMode();
         SelectStmt newSelectStmt = selectStmt;
         boolean isSendFields = false;
         if (cacheResult != null) {
             isCached = true;
             if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
-                sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, true);
+                sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, true);
                 return;
             }
             // rewrite sql
             if (mode == CacheMode.Partition) {
                 if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) {
-                    isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
+                    isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
                 }
                 newSelectStmt = cacheAnalyzer.getRewriteStmt();
                 newSelectStmt.reset();
@@ -710,7 +719,7 @@ public class StmtExecutor {
         }
         
         if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
-            isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
+            isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
         }
 
         cacheAnalyzer.updateCache();
@@ -720,7 +729,7 @@ public class StmtExecutor {
             isSendFields = true;
         }
 
-        statisticsForAuditLog = batch.getQueryStatistics();
+        statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
         context.getState().setEof();
         return;
     }
@@ -802,7 +811,7 @@ public class StmtExecutor {
             }
         }
 
-        statisticsForAuditLog = batch.getQueryStatistics();
+        statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
         context.getState().setEof();
         plannerProfile.setQueryFetchResultFinishTime();
     }
@@ -1110,20 +1119,20 @@ public class StmtExecutor {
         context.getCatalog().getExportMgr().addExportJob(exportStmt);
     }
 
-    public PQueryStatistics getQueryStatisticsForAuditLog() {
+    public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
         if (statisticsForAuditLog == null) {
-            statisticsForAuditLog = new PQueryStatistics();
+            statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
         }
-        if (statisticsForAuditLog.scan_bytes == null) {
-            statisticsForAuditLog.scan_bytes = 0L;
+        if (!statisticsForAuditLog.hasScanBytes()) {
+            statisticsForAuditLog.setScanBytes(0L);
         }
-        if (statisticsForAuditLog.scan_rows == null) {
-            statisticsForAuditLog.scan_rows = 0L;
+        if (!statisticsForAuditLog.hasScanRows()) {
+            statisticsForAuditLog.setScanRows(0L);
         }
-        if (statisticsForAuditLog.cpu_ms == null) {
-            statisticsForAuditLog.cpu_ms = 0L;
+        if (!statisticsForAuditLog.hasCpuMs()) {
+            statisticsForAuditLog.setCpuMs(0L);
         }
-        return statisticsForAuditLog;
+        return statisticsForAuditLog.build();
     }
 
     private List<PrimitiveType> exprToType(List<Expr> exprs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
index c2a054e..1711179 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe.cache;
 import org.apache.doris.analysis.SelectStmt;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Status;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.RowBatch;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.logging.log4j.LogManager;
@@ -50,7 +51,7 @@ public abstract class Cache {
         hitRange = HitRange.None;
     }
 
-    public abstract CacheProxy.FetchCacheResult getCacheData(Status status);
+    public abstract InternalService.PFetchCacheResult getCacheData(Status status);
 
     public HitRange getHitRange() {
         return hitRange;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index ddd2725..d8a6f47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -40,6 +40,7 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.RowBatch;
 import org.apache.doris.thrift.TUniqueId;
@@ -267,8 +268,7 @@ public class CacheAnalyzer {
         return CacheMode.Partition;
     }
 
-    public CacheBeProxy.FetchCacheResult getCacheData() {
-        CacheProxy.FetchCacheResult cacheResult = null;
+    public InternalService.PFetchCacheResult getCacheData() {
         cacheMode = innerCheckCacheMode(0);
         if (cacheMode == CacheMode.NoNeed) {
             return null;
@@ -277,13 +277,18 @@ public class CacheAnalyzer {
             return null;
         }
         Status status = new Status();
-        cacheResult = cache.getCacheData(status);
-
-        if (status.ok() && cacheResult != null) {
+        InternalService.PFetchCacheResult cacheResult = cache.getCacheData(status);
+        if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
+            int rowCount = 0;
+            int dataSize = 0;
+            for (InternalService.PCacheValue value : cacheResult.getValuesList()) {
+                rowCount += value.getRowsCount();
+                dataSize += value.getDataSize();
+            }
             LOG.debug("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
                     cacheMode, DebugUtil.printId(queryId),
-                    cacheResult.all_count, cacheResult.value_count,
-                    cacheResult.row_count, cacheResult.data_size);
+                    cacheResult.getAllCount(), cacheResult.getValuesCount(),
+                    rowCount, dataSize);
         } else {
             LOG.debug("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
                     DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
index 064c86b..d7e5eda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
@@ -18,14 +18,8 @@
 package org.apache.doris.qe.cache;
 
 import org.apache.doris.common.Status;
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PCacheStatus;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PClearType;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PUniqueId;
-import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.qe.SimpleScheduler;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -48,8 +42,8 @@ import java.util.concurrent.TimeoutException;
 public class CacheBeProxy extends CacheProxy {
     private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
 
-    public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status) {
-        PUniqueId sqlKey = request.sql_key;
+    public void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status) {
+        Types.PUniqueId sqlKey = request.getSqlKey();
         Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
         if (backend == null) {
             LOG.warn("update cache can't find backend, sqlKey {}", sqlKey);
@@ -57,13 +51,13 @@ public class CacheBeProxy extends CacheProxy {
         }
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
         try {
-            PUpdateCacheRequest updateRequest = request.getRpcRequest();
-            Future<PCacheResponse> future = BackendServiceProxy.getInstance().updateCache(address, updateRequest);
-            PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
-            if (response.status == PCacheStatus.CACHE_OK) {
+            Future<InternalService.PCacheResponse> future = BackendServiceProxy.getInstance()
+                    .updateCache(address, request);
+            InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
+            if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
                 status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
             } else {
-                status.setStatus(response.status.toString());
+                status.setStatus(response.getStatus().toString());
             }
         } catch (Exception e) {
             LOG.warn("update cache exception, sqlKey {}", sqlKey, e);
@@ -72,35 +66,18 @@ public class CacheBeProxy extends CacheProxy {
         }
     }
 
-    public FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status) {
-        PUniqueId sqlKey = request.sql_key;
+    public InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request,
+                                                        int timeoutMs, Status status) {
+        Types.PUniqueId sqlKey = request.getSqlKey();
         Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
         if (backend == null) {
             return null;
         }
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
-        long timeoutTs = System.currentTimeMillis() + timeoutMs;
-        FetchCacheResult result = null;
         try {
-            PFetchCacheRequest fetchRequest = request.getRpcRequest();
-            Future<PFetchCacheResult> future = BackendServiceProxy.getInstance().fetchCache(address, fetchRequest);
-            PFetchCacheResult fetchResult = null;
-            while (fetchResult == null) {
-                long currentTs = System.currentTimeMillis();
-                if (currentTs >= timeoutTs) {
-                    throw new TimeoutException("query cache timeout");
-                }
-                fetchResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
-                if (fetchResult.status == PCacheStatus.CACHE_OK) {
-                    status = new Status(TStatusCode.OK, "");
-                    result = new FetchCacheResult();
-                    result.setResult(fetchResult);
-                    return result;
-                } else {
-                    status.setStatus(fetchResult.status.toString());
-                    return null;
-                }
-            }
+            Future<InternalService.PFetchCacheResult> future = BackendServiceProxy.getInstance()
+                    .fetchCache(address, request);
+            return future.get(timeoutMs, TimeUnit.MILLISECONDS);
         } catch (RpcException e) {
             LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
             status.setRpcStatus(e.getMessage());
@@ -114,16 +91,15 @@ public class CacheBeProxy extends CacheProxy {
         } catch (TimeoutException e) {
             LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
             status.setStatus("query timeout");
-        } finally {
         }
-        return result;
+        return null;
     }
 
-    public void clearCache(PClearCacheRequest request) {
+    public void clearCache(InternalService.PClearCacheRequest request) {
         this.clearCache(request, CacheCoordinator.getInstance().getBackendList());
     }
 
-    public void clearCache(PClearCacheRequest request, List<Backend> beList) {
+    public void clearCache(InternalService.PClearCacheRequest request, List<Backend> beList) {
         int retry;
         Status status = new Status();
         for (Backend backend : beList) {
@@ -143,18 +119,18 @@ public class CacheBeProxy extends CacheProxy {
         }
     }
 
-    protected boolean clearCache(PClearCacheRequest request, Backend backend, int timeoutMs, Status status) {
+    protected boolean clearCache(InternalService.PClearCacheRequest request, Backend backend, int timeoutMs, Status status) {
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
         try {
-            request.clear_type = PClearType.CLEAR_ALL;
+            request = request.toBuilder().setClearType(InternalService.PClearType.CLEAR_ALL).build();
             LOG.info("clear all backend cache, backendId {}", backend.getId());
-            Future<PCacheResponse> future = BackendServiceProxy.getInstance().clearCache(address, request);
-            PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
-            if (response.status == PCacheStatus.CACHE_OK) {
+            Future<InternalService.PCacheResponse> future = BackendServiceProxy.getInstance().clearCache(address, request);
+            InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
+            if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
                 status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
                 return true;
             } else {
-                status.setStatus(response.status.toString());
+                status.setStatus(response.getStatus().toString());
                 return false;
             }
         } catch (Exception e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
index 97a7301..dbbb403 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
@@ -18,7 +18,7 @@
 package org.apache.doris.qe.cache;
 
 import org.apache.doris.catalog.Catalog;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.Types;
 import org.apache.doris.qe.SimpleScheduler;
 import org.apache.doris.system.Backend;
 
@@ -68,13 +68,13 @@ public class CacheCoordinator {
      * @param sqlKey 128 bit's sql md5
      * @return Backend
      */
-    public Backend findBackend(PUniqueId sqlKey) {
+    public Backend findBackend(Types.PUniqueId sqlKey) {
         resetBackend();
         Backend virtualNode = null;
         try {
             belock.lock();
-            SortedMap<Long, Backend> headMap = virtualNodes.headMap(sqlKey.hi);
-            SortedMap<Long, Backend> tailMap = virtualNodes.tailMap(sqlKey.hi);
+            SortedMap<Long, Backend> headMap = virtualNodes.headMap(sqlKey.getHi());
+            SortedMap<Long, Backend> tailMap = virtualNodes.tailMap(sqlKey.getHi());
             int retryTimes = 0;
             while (true) {
                 if (tailMap == null || tailMap.size() == 0) {
@@ -131,9 +131,9 @@ public class CacheCoordinator {
             if (!idToBackend.containsKey(bid)) {
                 for (int i = 0; i < VIRTUAL_NODES; i++) {
                     String nodeName = String.valueOf(bid) + "::" + String.valueOf(i);
-                    PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
-                    virtualNodes.remove(nodeId.hi);
-                    LOG.debug("remove backend id {}, virtual node name {} hashcode {}", bid, nodeName, nodeId.hi);
+                    Types.PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
+                    virtualNodes.remove(nodeId.getHi());
+                    LOG.debug("remove backend id {}, virtual node name {} hashcode {}", bid, nodeName, nodeId.getHi());
                 }
                 itr.remove();
             }
@@ -147,9 +147,9 @@ public class CacheCoordinator {
         realNodes.put(backend.getId(), backend);
         for (int i = 0; i < VIRTUAL_NODES; i++) {
             String nodeName = String.valueOf(backend.getId()) + "::" + String.valueOf(i);
-            PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
-            virtualNodes.put(nodeId.hi, backend);
-            LOG.debug("add backend id {}, virtual node name {} hashcode {}", backend.getId(), nodeName, nodeId.hi);
+            Types.PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
+            virtualNodes.put(nodeId.getHi(), backend);
+            LOG.debug("add backend id {}, virtual node name {} hashcode {}", backend.getId(), nodeName, nodeId.getHi());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
index a750737..f9664d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
@@ -18,25 +18,14 @@
 package org.apache.doris.qe.cache;
 
 import org.apache.doris.common.Status;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.proto.PCacheParam;
-import org.apache.doris.proto.PCacheValue;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PUniqueId;
-import org.apache.doris.proto.PUpdateCacheRequest;
-import org.apache.doris.qe.RowBatch;
-import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 
-import com.google.common.collect.Lists;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.List;
 
 /**
  * It encapsulates the request and response parameters and methods,
@@ -48,203 +37,6 @@ public abstract class CacheProxy {
     public static int UPDATE_TIMEOUT = 10000;
     public static int CLEAR_TIMEOUT = 30000;
 
-    public static class CacheParam extends PCacheParam {
-        public CacheParam(PCacheParam param) {
-            partition_key = param.partition_key;
-            last_version = param.last_version;
-            last_version_time = param.last_version_time;
-        }
-
-        public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) {
-            partition_key = partitionKey;
-            last_version = lastVersion;
-            last_version_time = lastVersionTime;
-        }
-
-        public PCacheParam getParam() {
-            PCacheParam param = new PCacheParam();
-            param.partition_key = partition_key;
-            param.last_version = last_version;
-            param.last_version_time = last_version_time;
-            return param;
-        }
-
-        public void debug() {
-            LOG.info("cache param, part key {}, version {}, time {}",
-                    partition_key, last_version, last_version_time);
-        }
-    }
-
-    public static class CacheValue extends PCacheValue {
-        public CacheParam param;
-        public TResultBatch resultBatch;
-
-        public CacheValue() {
-            param = null;
-            rows = Lists.newArrayList();
-            data_size = 0;
-            resultBatch = new TResultBatch();
-        }
-
-        public void addRpcResult(PCacheValue value) {
-            param = new CacheParam(value.param);
-            data_size += value.data_size;
-            rows.addAll(value.rows);
-        }
-
-        public RowBatch getRowBatch() {
-            for (byte[] one : rows) {
-                resultBatch.addToRows(ByteBuffer.wrap(one));
-            }
-            RowBatch batch = new RowBatch();
-            resultBatch.setPacketSeq(1);
-            resultBatch.setIsCompressed(false);
-            batch.setBatch(resultBatch);
-            batch.setEos(true);
-            return batch;
-        }
-
-        public void addUpdateResult(long partitionKey, long lastVersion, long lastVersionTime, List<byte[]> rowList) {
-            param = new CacheParam(partitionKey, lastVersion, lastVersionTime);
-            for (byte[] buf : rowList) {
-                data_size += buf.length;
-                rows.add(buf);
-            }
-        }
-
-        public PCacheValue getRpcValue() {
-            PCacheValue value = new PCacheValue();
-            value.param = param.getParam();
-            value.data_size = data_size;
-            value.rows = rows;
-            return value;
-        }
-
-        public void debug() {
-            LOG.info("cache value, partkey {}, ver:{}, time {}, row_num {}, data_size {}",
-                    param.partition_key, param.last_version, param.last_version_time,
-                    rows.size(),
-                    data_size);
-            for (int i = 0; i < rows.size(); i++) {
-                LOG.info("{}:{}", i, rows.get(i));
-            }
-        }
-    }
-
-    public static class UpdateCacheRequest extends PUpdateCacheRequest {
-        public int value_count;
-        public int row_count;
-        public int data_size;
-        private List<CacheValue> valueList;
-
-        public UpdateCacheRequest(String sqlStr) {
-            this.sql_key = getMd5(sqlStr);
-            this.valueList = Lists.newArrayList();
-            value_count = 0;
-            row_count = 0;
-            data_size = 0;
-        }
-
-        public void addValue(long partitionKey, long lastVersion, long lastVersionTime, List<byte[]> rowList) {
-            CacheValue value = new CacheValue();
-            value.addUpdateResult(partitionKey, lastVersion, lastVersionTime, rowList);
-            valueList.add(value);
-            value_count++;
-        }
-
-        public PUpdateCacheRequest getRpcRequest() {
-            value_count = valueList.size();
-            PUpdateCacheRequest request = new PUpdateCacheRequest();
-            request.values = Lists.newArrayList();
-            request.sql_key = sql_key;
-            for (CacheValue value : valueList) {
-                request.values.add(value.getRpcValue());
-                row_count += value.rows.size();
-                data_size = value.data_size;
-            }
-            return request;
-        }
-
-        public void debug() {
-            LOG.info("update cache request, sql_key {}, value_size {}", DebugUtil.printId(sql_key),
-                    valueList.size());
-            for (CacheValue value : valueList) {
-                value.debug();
-            }
-        }
-    }
-
-
-    public static class FetchCacheRequest extends PFetchCacheRequest {
-        private List<CacheParam> paramList;
-
-        public FetchCacheRequest(String sqlStr) {
-            this.sql_key = getMd5(sqlStr);
-            this.paramList = Lists.newArrayList();
-        }
-
-        public void addParam(long partitionKey, long lastVersion, long lastVersionTime) {
-            CacheParam param = new CacheParam(partitionKey, lastVersion, lastVersionTime);
-            paramList.add(param);
-        }
-
-        public PFetchCacheRequest getRpcRequest() {
-            PFetchCacheRequest request = new PFetchCacheRequest();
-            request.params = Lists.newArrayList();
-            request.sql_key = sql_key;
-            for (CacheParam param : paramList) {
-                request.params.add(param.getParam());
-            }
-            return request;
-        }
-
-        public void debug() {
-            LOG.info("fetch cache request, sql_key {}, param count {}", DebugUtil.printId(sql_key), paramList.size());
-            for (CacheParam param : paramList) {
-                param.debug();
-            }
-        }
-    }
-
-    public static class FetchCacheResult extends PFetchCacheResult {
-        public int all_count;
-        public int value_count;
-        public int row_count;
-        public int data_size;
-        private List<CacheValue> valueList;
-
-        public FetchCacheResult() {
-            valueList = Lists.newArrayList();
-            all_count = 0;
-            value_count = 0;
-            row_count = 0;
-            data_size = 0;
-        }
-
-        public List<CacheValue> getValueList() {
-            return valueList;
-        }
-
-        public void setResult(PFetchCacheResult rpcResult) {
-            value_count = rpcResult.values.size();
-            for (int i = 0; i < rpcResult.values.size(); i++) {
-                PCacheValue rpcValue = rpcResult.values.get(i);
-                CacheValue value = new CacheValue();
-                value.addRpcResult(rpcValue);
-                valueList.add(value);
-                row_count += value.rows.size();
-                data_size += value.data_size;
-            }
-        }
-
-        public void debug() {
-            LOG.info("fetch cache result, value size {}", valueList.size());
-            for (CacheValue value : valueList) {
-                value.debug();
-            }
-        }
-    }
-
     public enum CacheProxyType {
         FE,
         BE,
@@ -265,14 +57,15 @@ public abstract class CacheProxy {
         return null;
     }
 
-    public abstract void updateCache(UpdateCacheRequest request, int timeoutMs, Status status);
+    public abstract void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status);
 
-    public abstract FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status);
+    public abstract InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request,
+                                                                 int timeoutMs, Status status);
 
-    public abstract void clearCache(PClearCacheRequest clearRequest);
+    public abstract void clearCache(InternalService.PClearCacheRequest clearRequest);
 
 
-    public static PUniqueId getMd5(String str) {
+    public static Types.PUniqueId getMd5(String str) {
         MessageDigest msgDigest;
         try {
             //128 bit
@@ -281,9 +74,10 @@ public abstract class CacheProxy {
             return null;
         }
         final byte[] digest = msgDigest.digest(str.getBytes());
-        PUniqueId key = new PUniqueId();
-        key.lo = getLongFromByte(digest, 0);//64 bit
-        key.hi = getLongFromByte(digest, 8);//64 bit
+        Types.PUniqueId key = Types.PUniqueId.newBuilder()
+                .setLo(getLongFromByte(digest, 0))
+                .setHi(getLongFromByte(digest, 8))
+                .build();
         return key;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
index a820eb6..d208d77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.RangePartitionInfo;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.RowBatch;
 import org.apache.doris.thrift.TUniqueId;
 
@@ -38,6 +39,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class PartitionCache extends Cache {
     private static final Logger LOG = LogManager.getLogger(PartitionCache.class);
@@ -73,33 +75,33 @@ public class PartitionCache extends Cache {
         this.newRangeList = Lists.newArrayList();
     }
 
-    public CacheProxy.FetchCacheResult getCacheData(Status status) {
-        CacheProxy.FetchCacheRequest request;
+    public InternalService.PFetchCacheResult getCacheData(Status status) {
+
         rewriteSelectStmt(null);
-        request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql());
         range = new PartitionRange(this.partitionPredicate, this.olapTable,
                 this.partitionInfo);
         if (!range.analytics()) {
             status.setStatus("analytics range error");
             return null;
         }
-
-        for (PartitionRange.PartitionSingle single : range.getPartitionSingleList()) {
-            request.addParam(single.getCacheKey().realValue(),
-                    single.getPartition().getVisibleVersion(),
-                    single.getPartition().getVisibleVersionTime()
-            );
-        }
-
-        CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
-        if (status.ok() && cacheResult != null) {
-            cacheResult.all_count = range.getPartitionSingleList().size();
-            for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) {
-                range.setCacheFlag(value.param.partition_key);
+        InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder()
+                .setSqlKey(CacheProxy.getMd5(nokeyStmt.toSql()))
+                .addAllParams(range.getPartitionSingleList().stream().map(
+                        p -> InternalService.PCacheParam.newBuilder()
+                                .setPartitionKey(p.getCacheKey().realValue())
+                                .setLastVersion(p.getPartition().getVisibleVersion())
+                                .setLastVersionTime(p.getPartition().getVisibleVersionTime())
+                                .build()).collect(Collectors.toList())
+                ).build();
+        InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
+        if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
+            for (InternalService.PCacheValue value : cacheResult.getValuesList()) {
+                range.setCacheFlag(value.getParam().getPartitionKey());
             }
+            cacheResult = cacheResult.toBuilder().setAllCount(range.getPartitionSingleList().size()).build();
             MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L);
             MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) range.getPartitionSingleList().size());
-            MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValueList().size());
+            MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValuesList().size());
         }
 
         range.setTooNewByID(latestTable.latestPartitionId);
@@ -125,15 +127,21 @@ public class PartitionCache extends Cache {
             return;
         }
 
-        CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql());
-        if (updateRequest.value_count > 0) {
+        InternalService.PUpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql());
+        if (updateRequest.getValuesCount() > 0) {
             CacheBeProxy proxy = new CacheBeProxy();
             Status status = new Status();
             proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
+            int rowCount = 0;
+            int dataSize = 0;
+            for (InternalService.PCacheValue value : updateRequest.getValuesList()) {
+                rowCount += value.getRowsCount();
+                dataSize += value.getDataSize();
+            }
             LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
                     CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId),
-                    DebugUtil.printId(updateRequest.sql_key),
-                    updateRequest.value_count, updateRequest.row_count, updateRequest.data_size);
+                    DebugUtil.printId(updateRequest.getSqlKey()),
+                    updateRequest.getValuesCount(), rowCount, dataSize);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
index e7f3a3a..444cef2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
@@ -17,11 +17,15 @@
 
 package org.apache.doris.qe.cache;
 
-import com.google.common.collect.Lists;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.RowBatch;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -30,14 +34,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
- *  According to the query partition range and cache hit, the rowbatch to update the cache is constructed
+ * According to the query partition range and cache hit, the rowbatch to update the cache is constructed
  */
 public class RowBatchBuilder {
     private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class);
 
-    private CacheBeProxy.UpdateCacheRequest updateRequest;
+    private InternalService.PUpdateCacheRequest updateRequest;
     private CacheAnalyzer.CacheMode cacheMode;
     private int keyIndex;
     private Type keyType;
@@ -63,8 +68,8 @@ public class RowBatchBuilder {
     }
 
     public void buildPartitionIndex(ArrayList<Expr> resultExpr,
-                               List<String> columnLabel, Column partColumn,
-                               List<PartitionRange.PartitionSingle> newSingleList) {
+                                    List<String> columnLabel, Column partColumn,
+                                    List<PartitionRange.PartitionSingle> newSingleList) {
         if (cacheMode != CacheAnalyzer.CacheMode.Partition) {
             return;
         }
@@ -95,11 +100,19 @@ public class RowBatchBuilder {
         }
     }
 
-    public CacheBeProxy.UpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) {
+    public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) {
         if (updateRequest == null) {
-            updateRequest = new CacheBeProxy.UpdateCacheRequest(sql);
+            updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
         }
-        updateRequest.addValue(partitionKey, lastVersion, lastestTime, rowList);
+        updateRequest = updateRequest.toBuilder()
+                .addValues(InternalService.PCacheValue.newBuilder()
+                        .setParam(InternalService.PCacheParam.newBuilder()
+                                .setPartitionKey(partitionKey)
+                                .setLastVersion(lastVersion)
+                                .setLastVersionTime(lastestTime)
+                                .build()).setDataSize(dataSize).addAllRows(
+                                rowList.stream().map(row -> ByteString.copyFrom(row))
+                                        .collect(Collectors.toList()))).build();
         return updateRequest;
     }
 
@@ -115,7 +128,7 @@ public class RowBatchBuilder {
             if (i == index) {
                 byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len);
                 String str = new String(content);
-                key.init(type, str.toString());
+                key.init(type, str);
             }
         }
         return key;
@@ -124,9 +137,9 @@ public class RowBatchBuilder {
     /**
      * Rowbatch split to Row
      */
-    public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) {
+    public InternalService.PUpdateCacheRequest buildPartitionUpdateRequest(String sql) {
         if (updateRequest == null) {
-            updateRequest = new CacheBeProxy.UpdateCacheRequest(sql);
+            updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
         }
         HashMap<Long, List<byte[]>> partRowMap = new HashMap<>();
         List<byte[]> partitionRowList;
@@ -150,9 +163,17 @@ public class RowBatchBuilder {
             Long key = entry.getKey();
             PartitionRange.PartitionSingle partition = cachePartMap.get(key);
             partitionRowList = entry.getValue();
-            updateRequest.addValue(key, partition.getPartition().getVisibleVersion(),
-                    partition.getPartition().getVisibleVersionTime(), partitionRowList);
+            updateRequest = updateRequest.toBuilder()
+                    .addValues(InternalService.PCacheValue.newBuilder()
+                            .setParam(InternalService.PCacheParam.newBuilder()
+                                    .setPartitionKey(key)
+                                    .setLastVersion(partition.getPartition().getVisibleVersion())
+                                    .setLastVersionTime(partition.getPartition().getVisibleVersionTime())
+                                    .build()).addAllRows(
+                                    partitionRowList.stream().map(row -> ByteString.copyFrom(row))
+                                            .collect(Collectors.toList()))).build();
         }
         return updateRequest;
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
index dd67c6e..493ca68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
@@ -21,8 +21,10 @@ import org.apache.doris.analysis.SelectStmt;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.RowBatch;
 import org.apache.doris.thrift.TUniqueId;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -37,13 +39,18 @@ public class SqlCache extends Cache {
         this.latestTable = latestTable;
     }
 
-    public CacheProxy.FetchCacheResult getCacheData(Status status) {
-        CacheProxy.FetchCacheRequest request = new CacheProxy.FetchCacheRequest(selectStmt.toSql());
-        request.addParam(latestTable.latestPartitionId, latestTable.latestVersion,
-                latestTable.latestTime);
-        CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
-        if (status.ok() && cacheResult != null) {
-            cacheResult.all_count = 1;
+    public InternalService.PFetchCacheResult getCacheData(Status status) {
+        InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder()
+                .setSqlKey(CacheProxy.getMd5(selectStmt.toSql()))
+                .addParams(InternalService.PCacheParam.newBuilder()
+                        .setPartitionKey(latestTable.latestPartitionId)
+                        .setLastVersion(latestTable.latestVersion)
+                        .setLastVersionTime(latestTable.latestTime))
+                .build();
+
+        InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
+        if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
+            cacheResult = cacheResult.toBuilder().setAllCount(1).build();
             MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
             hitRange = HitRange.Full;
         }
@@ -66,15 +73,21 @@ public class SqlCache extends Cache {
             return;
         }
 
-        CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(),
+        InternalService.PUpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(),
                 latestTable.latestPartitionId, latestTable.latestVersion, latestTable.latestTime);
-        if (updateRequest.value_count > 0) {
+        if (updateRequest.getValuesCount() > 0) {
             CacheBeProxy proxy = new CacheBeProxy();
             Status status = new Status();
             proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
+            int rowCount = 0;
+            int dataSize = 0;
+            for (InternalService.PCacheValue value : updateRequest.getValuesList()) {
+                rowCount += value.getRowsCount();
+                dataSize += value.getDataSize();
+            }
             LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
-                    CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.sql_key),
-                    updateRequest.value_count, updateRequest.row_count, updateRequest.data_size);
+                    CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.getSqlKey()),
+                    updateRequest.getValuesCount(), rowCount, dataSize);
         }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
new file mode 100644
index 0000000..e098a0d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.PBackendServiceGrpc;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import java.util.concurrent.Future;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+
+public class BackendServiceClient {
+    private static final int MAX_RETRY_NUM = 3;
+    private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
+    private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
+    private final ManagedChannel channel;
+
+    public BackendServiceClient(TNetworkAddress address) {
+        channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
+                .flowControlWindow(Config.grpc_max_message_size_bytes)
+                .maxInboundMessageSize(Config.grpc_max_message_size_bytes)
+                .enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
+                .usePlaintext().build();
+        stub = PBackendServiceGrpc.newFutureStub(channel);
+        blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
+    }
+
+    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
+            InternalService.PExecPlanFragmentRequest request) {
+        return stub.execPlanFragment(request);
+    }
+
+    public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
+            InternalService.PCancelPlanFragmentRequest request) {
+        return stub.cancelPlanFragment(request);
+    }
+
+    public Future<InternalService.PFetchDataResult> fetchDataAsync(InternalService.PFetchDataRequest request) {
+        return stub.fetchData(request);
+    }
+
+    public InternalService.PFetchDataResult fetchDataSync(InternalService.PFetchDataRequest request) {
+        return blockingStub.fetchData(request);
+    }
+
+    public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) {
+        return stub.updateCache(request);
+    }
+
+    public Future<InternalService.PFetchCacheResult> fetchCache(InternalService.PFetchCacheRequest request) {
+        return stub.fetchCache(request);
+    }
+
+    public Future<InternalService.PCacheResponse> clearCache(InternalService.PClearCacheRequest request) {
+        return stub.clearCache(request);
+    }
+
+    public Future<InternalService.PTriggerProfileReportResult> triggerProfileReport(
+            InternalService.PTriggerProfileReportRequest request) {
+        return stub.triggerProfileReport(request);
+    }
+
+    public Future<InternalService.PProxyResult> getInfo(InternalService.PProxyRequest request) {
+        return stub.getInfo(request);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 5d2c3af..1c4d785 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -17,22 +17,8 @@
 
 package org.apache.doris.rpc;
 
-import org.apache.doris.common.Config;
-import org.apache.doris.common.util.JdkUtils;
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PCancelPlanFragmentRequest;
-import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PPlanFragmentCancelReason;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.proto.PUniqueId;
-import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
@@ -40,37 +26,20 @@ import org.apache.doris.thrift.TUniqueId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 
-import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper;
-import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler;
-import com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy;
-import com.baidu.jprotobuf.pbrpc.transport.RpcClient;
-import com.baidu.jprotobuf.pbrpc.transport.RpcClientOptions;
 import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
 
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.concurrent.Future;
 
 public class BackendServiceProxy {
     private static final Logger LOG = LogManager.getLogger(BackendServiceProxy.class);
-
-    private RpcClient rpcClient;
-    // TODO(zc): use TNetworkAddress,
-    private Map<TNetworkAddress, PBackendService> serviceMap;
-
     private static volatile BackendServiceProxy INSTANCE;
-
-    static {
-        int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version"));
-        JDKCompilerHelper.setCompiler(new JdkCompiler(JdkCompiler.class.getClassLoader(), String.valueOf(javaRuntimeVersion)));
-    }
+    private final Map<TNetworkAddress, BackendServiceClient> serviceMap;
 
     public BackendServiceProxy() {
-        final RpcClientOptions rpcOptions = new RpcClientOptions();
-        rpcOptions.setMaxWait(Config.brpc_idle_wait_max_time);
-        rpcOptions.setThreadPoolSize(Config.brpc_number_of_concurrent_requests_processed);
-        rpcClient = new RpcClient(rpcOptions);
         serviceMap = Maps.newHashMap();
     }
 
@@ -85,42 +54,24 @@ public class BackendServiceProxy {
         return INSTANCE;
     }
 
-    private synchronized PBackendService getProxy(TNetworkAddress address) {
-        PBackendService service = serviceMap.get(address);
+    private synchronized BackendServiceClient getProxy(TNetworkAddress address) {
+        BackendServiceClient service = serviceMap.get(address);
         if (service != null) {
             return service;
         }
-        ProtobufRpcProxy<PBackendService> proxy = new ProtobufRpcProxy(rpcClient, PBackendService.class);
-        proxy.setHost(address.getHostname());
-        proxy.setPort(address.getPort());
-        service = proxy.proxy();
+        service = new BackendServiceClient(address);
         serviceMap.put(address, service);
         return service;
     }
 
-    public Future<PExecPlanFragmentResult> execPlanFragmentAsync(
+    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
             TNetworkAddress address, TExecPlanFragmentParams tRequest)
             throws TException, RpcException {
-        final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest();
-        pRequest.setRequest(tRequest);
+        final InternalService.PExecPlanFragmentRequest pRequest = InternalService.PExecPlanFragmentRequest.newBuilder()
+                .setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build();
         try {
-            final PBackendService service = getProxy(address);
-            return service.execPlanFragmentAsync(pRequest);
-        } catch (NoSuchElementException e) {
-            try {
-                // retry
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException interruptedException) {
-                    // do nothing
-                }
-                final PBackendService service = getProxy(address);
-                return service.execPlanFragmentAsync(pRequest);
-            } catch (NoSuchElementException noSuchElementException) {
-                LOG.warn("Execute plan fragment retry failed, address={}:{}",
-                        address.getHostname(), address.getPort(), noSuchElementException);
-                throw new RpcException(address.hostname, e.getMessage());
-            }
+            final BackendServiceClient client = getProxy(address);
+            return client.execPlanFragmentAsync(pRequest);
         } catch (Throwable e) {
             LOG.warn("Execute plan fragment catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -128,32 +79,17 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(
-            TNetworkAddress address, TUniqueId finstId, PPlanFragmentCancelReason cancelReason) throws RpcException {
-        final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest();
-        PUniqueId uid = new PUniqueId();
-        uid.hi = finstId.hi;
-        uid.lo = finstId.lo;
-        pRequest.finst_id = uid;
-        pRequest.cancel_reason = cancelReason;
+    public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
+            TNetworkAddress address, TUniqueId finstId, InternalService.PPlanFragmentCancelReason cancelReason)
+            throws RpcException {
+        final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
+                .newBuilder()
+                .setFinstId(
+                        Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build())
+                .setCancelReason(cancelReason).build();
         try {
-            final PBackendService service = getProxy(address);
-            return service.cancelPlanFragmentAsync(pRequest);
-        } catch (NoSuchElementException e) {
-            // retry
-            try {
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException interruptedException) {
-                    // do nothing
-                }
-                final PBackendService service = getProxy(address);
-                return service.cancelPlanFragmentAsync(pRequest);
-            } catch (NoSuchElementException noSuchElementException) {
-                LOG.warn("Cancel plan fragment retry failed, address={}:{}",
-                        address.getHostname(), address.getPort(), noSuchElementException);
-                throw new RpcException(address.hostname, e.getMessage());
-            }
+            final BackendServiceClient client = getProxy(address);
+            return client.cancelPlanFragmentAsync(pRequest);
         } catch (Throwable e) {
             LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -161,11 +97,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<PFetchDataResult> fetchDataAsync(
-            TNetworkAddress address, PFetchDataRequest request) throws RpcException {
+    public Future<InternalService.PFetchDataResult> fetchDataAsync(
+            TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException {
         try {
-            PBackendService service = getProxy(address);
-            return service.fetchDataAsync(request);
+            final BackendServiceClient client = getProxy(address);
+            return client.fetchDataAsync(request);
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -173,11 +109,23 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<PCacheResponse> updateCache(
-            TNetworkAddress address, PUpdateCacheRequest request) throws RpcException{
+    public InternalService.PFetchDataResult fetchDataSync(
+            TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException {
         try {
-            PBackendService service = getProxy(address);
-            return service.updateCache(request);
+            final BackendServiceClient client = getProxy(address);
+            return client.fetchDataSync(request);
+        } catch (Throwable e) {
+            LOG.warn("fetch data catch a exception, address={}:{}",
+                    address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
+    public Future<InternalService.PCacheResponse> updateCache(
+            TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.updateCache(request);
         } catch (Throwable e) {
             LOG.warn("update cache catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -185,11 +133,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<PFetchCacheResult> fetchCache(
-            TNetworkAddress address, PFetchCacheRequest request) throws RpcException {
+    public Future<InternalService.PFetchCacheResult> fetchCache(
+            TNetworkAddress address, InternalService.PFetchCacheRequest request) throws RpcException {
         try {
-            PBackendService service = getProxy(address);
-            return service.fetchCache(request);
+            final BackendServiceClient client = getProxy(address);
+            return client.fetchCache(request);
         } catch (Throwable e) {
             LOG.warn("fetch cache catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -197,11 +145,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<PCacheResponse> clearCache(
-            TNetworkAddress address, PClearCacheRequest request) throws RpcException {
+    public Future<InternalService.PCacheResponse> clearCache(
+            TNetworkAddress address, InternalService.PClearCacheRequest request) throws RpcException {
         try {
-            PBackendService service = getProxy(address);
-            return service.clearCache(request);
+            final BackendServiceClient client = getProxy(address);
+            return client.clearCache(request);
         } catch (Throwable e) {
             LOG.warn("clear cache catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -209,12 +157,11 @@ public class BackendServiceProxy {
         }
     }
 
-
-    public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
-            TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException {
+    public Future<InternalService.PTriggerProfileReportResult> triggerProfileReportAsync(
+            TNetworkAddress address, InternalService.PTriggerProfileReportRequest request) throws RpcException {
         try {
-            final PBackendService service = getProxy(address);
-            return service.triggerProfileReport(request);
+            final BackendServiceClient client = getProxy(address);
+            return client.triggerProfileReport(request);
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
@@ -222,11 +169,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<PProxyResult> getInfo(
-            TNetworkAddress address, PProxyRequest request) throws RpcException {
+    public Future<InternalService.PProxyResult> getInfo(
+            TNetworkAddress address, InternalService.PProxyRequest request) throws RpcException {
         try {
-            final PBackendService service = getProxy(address);
-            return service.getInfo(request);
+            final BackendServiceClient client = getProxy(address);
+            return client.getInfo(request);
         } catch (Throwable e) {
             LOG.warn("failed to get info, address={}:{}", address.getHostname(), address.getPort(), e);
             throw new RpcException(address.hostname, e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
deleted file mode 100644
index 6e95fe6..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PCancelPlanFragmentRequest;
-import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.proto.PUpdateCacheRequest;
-
-import com.baidu.jprotobuf.pbrpc.ProtobufRPC;
-
-import java.util.concurrent.Future;
-
-public interface PBackendService {
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment",
-            attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
-    Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request);
-
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "cancel_plan_fragment",
-            onceTalkTimeout = 5000)
-    Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(PCancelPlanFragmentRequest request);
-
-    // we set timeout to 1 day, because now there is no way to give different timeout for each RPC call
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_data",
-            attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000)
-    Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request);
-
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache", onceTalkTimeout = 10000)
-    Future<PCacheResponse>  updateCache(PUpdateCacheRequest request);
-
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache", onceTalkTimeout = 10000)
-    Future<PFetchCacheResult> fetchCache(PFetchCacheRequest request);
-
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache", onceTalkTimeout = 10000)
-    Future<PCacheResponse> clearCache(PClearCacheRequest request);
-
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report",
-            attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
-    Future<PTriggerProfileReportResult> triggerProfileReport(PTriggerProfileReportRequest request);
-
-    @ProtobufRPC(serviceName = "PBackendService", methodName = "get_info", onceTalkTimeout = 10000)
-    Future<PProxyResult> getInfo(PProxyRequest request);
-}
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java
deleted file mode 100644
index a183795..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-
-@ProtobufClass
-public class PExecPlanFragmentRequest extends AttachmentRequest {
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java
deleted file mode 100644
index 7cad9d4..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import org.apache.doris.proto.PUniqueId;
-
-import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-
-@ProtobufClass
-public class PFetchDataRequest extends AttachmentRequest {
-    public PFetchDataRequest() {}
-    public PFetchDataRequest(PUniqueId finstId) {
-        this.finstId = finstId;
-    }
-    @Protobuf(order = 1, required = true)
-    public PUniqueId finstId;
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
deleted file mode 100644
index 2bfce90..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import org.apache.doris.proto.PUniqueId;
-
-import com.baidu.bjf.remoting.protobuf.FieldType;
-import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-@ProtobufClass
-public class PTriggerProfileReportRequest extends AttachmentRequest {
-
-    @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false)
-    List<PUniqueId> instanceIds;
-
-    public PTriggerProfileReportRequest() {
-    }
-
-    public PTriggerProfileReportRequest(List<PUniqueId> instanceIds) {
-        this.instanceIds = Lists.newArrayList();
-        this.instanceIds.addAll(instanceIds);
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java
deleted file mode 100644
index a6f05c9..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import com.baidu.jprotobuf.pbrpc.ClientAttachmentHandler;
-
-public class ThriftClientAttachmentHandler implements ClientAttachmentHandler {
-    @Override
-    public byte[] handleRequest(String serviceName, String methodName, Object... objects) {
-        AttachmentRequest request = (AttachmentRequest) objects[0];
-        return request.getSerializedRequest();
-    }
-    @Override
-    public void handleResponse(byte[] bytes, String serviceName, String methodName, Object... objects) {
-        AttachmentRequest result = (AttachmentRequest) objects[0];
-        result.setSerializedResult(bytes);
-    }
-}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index c6bd97c..e979c0c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1469,7 +1469,7 @@ public class QueryPlanTest {
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
         System.out.println("wangxixu-explain:"+explainString);
         Assert.assertTrue(explainString.contains("PREDICATES: `query_time` < 1614614400, `query_time` >= 0"));
-        
+
     }
 
     @Test
@@ -1493,6 +1493,3 @@ public class QueryPlanTest {
         }
     }
 }
-
-
-
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
index 7421608..2a7fe57 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
@@ -28,7 +28,7 @@ import org.apache.doris.mysql.MysqlErrPacket;
 import org.apache.doris.mysql.MysqlOkPacket;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data;
 import org.apache.doris.thrift.TUniqueId;
 
 import org.junit.Assert;
@@ -55,7 +55,7 @@ public class ConnectProcessorTest {
     @Mocked
     private static SocketChannel socketChannel;
 
-    private static PQueryStatistics statistics = new PQueryStatistics();
+    private static Data.PQueryStatistics statistics = Data.PQueryStatistics.newBuilder().build();
 
     @BeforeClass
     public static void setUpClass() {
@@ -97,10 +97,7 @@ public class ConnectProcessorTest {
             serializer.writeEofString("");
             fieldListPacket = serializer.toByteBuffer();
         }
-
-        statistics.scan_bytes = 0L;
-        statistics.scan_rows = 0L;
-        statistics.cpu_ms = 0L;
+        statistics = statistics.toBuilder().setCpuMs(0L).setScanRows(0).setScanBytes(0).build();
 
         MetricRepo.init();
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
index 4f1525f..f2959f8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
@@ -52,7 +52,7 @@ import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.Types;
 import org.apache.doris.qe.cache.Cache;
 import org.apache.doris.qe.cache.CacheAnalyzer;
 import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
@@ -433,15 +433,12 @@ public class PartitionCacheTest {
         cp.addBackend(bd2);
         cp.addBackend(bd3);
         
-        PUniqueId key1 = new PUniqueId();
-        key1.hi = 1L;
-        key1.lo = 1L;
+        Types.PUniqueId key1 = Types.PUniqueId.newBuilder().setHi(1L).setLo(1L).build();
         Backend bk = cp.findBackend(key1);
         Assert.assertNotNull(bk);
         Assert.assertEquals(bk.getId(),3);
-        
-        key1.hi = 669560558156283345L;
-        key1.lo = 1L; 
+
+        key1 = key1.toBuilder().setHi(669560558156283345L).build();
         bk = cp.findBackend(key1);
         Assert.assertNotNull(bk);
         Assert.assertEquals(bk.getId(),1);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java
index 844828b..0924fdc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java
@@ -18,15 +18,14 @@
 package org.apache.doris.utframe;
 
 import org.apache.doris.common.ThriftServer;
-import org.apache.doris.common.util.JdkUtils;
+import org.apache.doris.proto.PBackendServiceGrpc;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.HeartbeatService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.utframe.MockedBackendFactory.BeThriftService;
 
-import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper;
-import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler;
-import com.baidu.jprotobuf.pbrpc.transport.RpcServer;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
 
 import org.apache.thrift.TProcessor;
 
@@ -48,7 +47,7 @@ public class MockedBackend {
 
     private ThriftServer heartbeatServer;
     private ThriftServer beThriftServer;
-    private RpcServer rpcServer;
+    private Server backendServer;
     
     private String host;
     private int heartbeatPort;
@@ -59,15 +58,10 @@ public class MockedBackend {
     // This must be set explicitly after creating mocked Backend
     private TNetworkAddress feAddress;
 
-    static {
-        int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version"));
-        JDKCompilerHelper.setCompiler(new JdkCompiler(JdkCompiler.class.getClassLoader(), String.valueOf(javaRuntimeVersion)));
-    }
-
     public MockedBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
             HeartbeatService.Iface hbService,
-            BeThriftService backendService,
-            Object pBackendService) throws IOException {
+            BeThriftService backendService, PBackendServiceGrpc.PBackendServiceImplBase pBackendService)
+            throws IOException {
 
         this.host = host;
         this.heartbeatPort = heartbeatPort;
@@ -116,7 +110,7 @@ public class MockedBackend {
         System.out.println("Be heartbeat service is started with port: " + heartbeatPort);
         beThriftServer.start();
         System.out.println("Be thrift service is started with port: " + thriftPort);
-        rpcServer.start(brpcPort);
+        backendServer.start();
         System.out.println("Be brpc service is started with port: " + brpcPort);
     }
 
@@ -130,8 +124,8 @@ public class MockedBackend {
         beThriftServer = new ThriftServer(beThriftPort, tprocessor);
     }
 
-    private void createBrpcService(int brpcPort, Object pBackendServiceImpl) {
-        rpcServer = new RpcServer();
-        rpcServer.registerService(pBackendServiceImpl);
+    private void createBrpcService(int brpcPort, PBackendServiceGrpc.PBackendServiceImplBase pBackendServiceImpl) {
+        backendServer = ServerBuilder.forPort(brpcPort)
+                .addService(pBackendServiceImpl).build();
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 06b6ecd..2dd7e80 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -18,18 +18,10 @@
 package org.apache.doris.utframe;
 
 import org.apache.doris.common.ClientPool;
-import org.apache.doris.proto.PCancelPlanFragmentRequest;
-import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PQueryStatistics;
-import org.apache.doris.proto.PStatus;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.rpc.PExecPlanFragmentRequest;
-import org.apache.doris.rpc.PFetchDataRequest;
-import org.apache.doris.rpc.PTriggerProfileReportRequest;
+import org.apache.doris.proto.Data;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.PBackendServiceGrpc;
+import org.apache.doris.proto.Status;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.HeartbeatService;
@@ -70,9 +62,9 @@ import org.apache.doris.thrift.TTransmitDataParams;
 import org.apache.doris.thrift.TTransmitDataResult;
 import org.apache.doris.thrift.TUniqueId;
 
-import com.baidu.jprotobuf.pbrpc.ProtobufRPCService;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
+import io.grpc.stub.StreamObserver;
 
 import org.apache.thrift.TException;
 
@@ -94,16 +86,10 @@ public class MockedBackendFactory {
     public static final int BE_DEFAULT_BRPC_PORT = 8060;
     public static final int BE_DEFAULT_HTTP_PORT = 8040;
 
-    // create a default mocked backend with 3 default rpc services
-    public static MockedBackend createDefaultBackend() throws IOException {
-        return createBackend(BE_DEFAULT_IP, BE_DEFAULT_HEARTBEAT_PORT, BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_BRPC_PORT, BE_DEFAULT_HTTP_PORT,
-                new DefaultHeartbeatServiceImpl(BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_HTTP_PORT, BE_DEFAULT_BRPC_PORT),
-                new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
-    }
-
     // create a mocked backend with customize parameters
     public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
-            HeartbeatService.Iface hbService, BeThriftService beThriftService, Object pBackendService)
+            HeartbeatService.Iface hbService, BeThriftService beThriftService,
+                                              PBackendServiceGrpc.PBackendServiceImplBase pBackendService)
             throws IOException {
         MockedBackend backend = new MockedBackend(host, heartbeatPort, thriftPort, brpcPort, httpPort, hbService,
                 beThriftService, pBackendService);
@@ -284,54 +270,94 @@ public class MockedBackendFactory {
     }
 
     // The default Brpc service.
-    // TODO(cmy): Currently this service cannot correctly simulate the processing of query requests.
-    public static class DefaultPBackendServiceImpl {
-        @ProtobufRPCService(serviceName = "PBackendService", methodName = "exec_plan_fragment")
-        public PExecPlanFragmentResult exec_plan_fragment(PExecPlanFragmentRequest request) {
+    public static class DefaultPBackendServiceImpl extends PBackendServiceGrpc.PBackendServiceImplBase {
+       @Override
+        public void transmitData(InternalService.PTransmitDataParams request, StreamObserver<InternalService.PTransmitDataResult> responseObserver) {
+           responseObserver.onNext(InternalService.PTransmitDataResult.newBuilder()
+                   .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+           responseObserver.onCompleted();
+        }
+
+        @Override
+        public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, StreamObserver<InternalService.PExecPlanFragmentResult> responseObserver) {
             System.out.println("get exec_plan_fragment request");
-            PExecPlanFragmentResult result = new PExecPlanFragmentResult();
-            PStatus pStatus = new PStatus();
-            pStatus.status_code = 0;
-            result.status = pStatus;
-            return result;
+            responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
+                    .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
         }
 
-        @ProtobufRPCService(serviceName = "PBackendService", methodName = "cancel_plan_fragment")
-        public PCancelPlanFragmentResult cancel_plan_fragment(PCancelPlanFragmentRequest request) {
+        @Override
+        public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, StreamObserver<InternalService.PCancelPlanFragmentResult> responseObserver) {
             System.out.println("get cancel_plan_fragment request");
-            PCancelPlanFragmentResult result = new PCancelPlanFragmentResult();
-            PStatus pStatus = new PStatus();
-            pStatus.status_code = 0;
-            result.status = pStatus;
-            return result;
+            responseObserver.onNext(InternalService.PCancelPlanFragmentResult.newBuilder()
+                    .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
         }
 
-        @ProtobufRPCService(serviceName = "PBackendService", methodName = "fetch_data")
-        public PFetchDataResult fetchDataAsync(PFetchDataRequest request) {
-            System.out.println("get fetch_data");
-            PFetchDataResult result = new PFetchDataResult();
-            PStatus pStatus = new PStatus();
-            pStatus.status_code = 0;
+        @Override
+        public void fetchData(InternalService.PFetchDataRequest request, StreamObserver<InternalService.PFetchDataResult> responseObserver) {
+            System.out.println("get fetch_data request");
+            responseObserver.onNext(InternalService.PFetchDataResult.newBuilder()
+                    .setStatus(Status.PStatus.newBuilder().setStatusCode(0))
+                    .setQueryStatistics(Data.PQueryStatistics.newBuilder()
+                            .setScanRows(0L)
+                            .setScanBytes(0L))
+                    .setEos(true)
+                    .setPacketSeq(0L)
+                    .build());
+            responseObserver.onCompleted();
+        }
 
-            PQueryStatistics pQueryStatistics = new PQueryStatistics();
-            pQueryStatistics.scan_rows = 0L;
-            pQueryStatistics.scan_bytes = 0L;
+        @Override
+        public void tabletWriterOpen(InternalService.PTabletWriterOpenRequest request, StreamObserver<InternalService.PTabletWriterOpenResult> responseObserver) {
+            responseObserver.onNext(null);
+            responseObserver.onCompleted();
+        }
 
-            result.status = pStatus;
-            result.packet_seq = 0L;
-            result.query_statistics = pQueryStatistics;
-            result.eos = true;
-            return result;
+        @Override
+        public void tabletWriterAddBatch(InternalService.PTabletWriterAddBatchRequest request, StreamObserver<InternalService.PTabletWriterAddBatchResult> responseObserver) {
+            responseObserver.onNext(null);
+            responseObserver.onCompleted();
         }
 
-        @ProtobufRPCService(serviceName = "PBackendService", methodName = "trigger_profile_report")
-        public PTriggerProfileReportResult triggerProfileReport(PTriggerProfileReportRequest request) {
-            return null;
+        @Override
+        public void tabletWriterCancel(InternalService.PTabletWriterCancelRequest request, StreamObserver<InternalService.PTabletWriterCancelResult> responseObserver) {
+            responseObserver.onNext(null);
+            responseObserver.onCompleted();
         }
 
-        @ProtobufRPCService(serviceName = "PBackendService", methodName = "get_info")
-        public PProxyResult getInfo(PProxyRequest request) {
-            return null;
+        @Override
+        public void triggerProfileReport(InternalService.PTriggerProfileReportRequest request, StreamObserver<InternalService.PTriggerProfileReportResult> responseObserver) {
+            System.out.println("get triggerProfileReport request");
+            responseObserver.onNext(InternalService.PTriggerProfileReportResult.newBuilder()
+                    .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void getInfo(InternalService.PProxyRequest request, StreamObserver<InternalService.PProxyResult> responseObserver) {
+            System.out.println("get get_info request");
+            responseObserver.onNext(InternalService.PProxyResult.newBuilder()
+                    .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void updateCache(InternalService.PUpdateCacheRequest request, StreamObserver<InternalService.PCacheResponse> responseObserver) {
+            responseObserver.onNext(null);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void fetchCache(InternalService.PFetchCacheRequest request, StreamObserver<InternalService.PFetchCacheResult> responseObserver) {
+            responseObserver.onNext(null);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void clearCache(InternalService.PClearCacheRequest request, StreamObserver<InternalService.PCacheResponse> responseObserver) {
+            responseObserver.onNext(null);
+            responseObserver.onCompleted();
         }
     }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index 24c949e..2a8f9ea 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -48,7 +48,8 @@ under the License.
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
-        <jprotobuf.version>2.2.11</jprotobuf.version>
+        <grpc.version>1.30.0</grpc.version>
+        <protobuf.version>3.5.1</protobuf.version>
         <skip.plugin>false</skip.plugin>
     </properties>
 
@@ -273,40 +274,12 @@ under the License.
                 <version>2.10.1</version>
             </dependency>
 
-            <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf -->
-            <dependency>
-                <groupId>com.baidu</groupId>
-                <artifactId>jprotobuf</artifactId>
-                <version>${jprotobuf.version}</version>
-                <classifier>jar-with-dependencies</classifier>
-            </dependency>
-
-            <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-common -->
-            <dependency>
-                <groupId>com.baidu</groupId>
-                <artifactId>jprotobuf-rpc-common</artifactId>
-                <version>1.8</version>
-            </dependency>
-
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
                 <version>2.6</version>
             </dependency>
 
-            <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-core -->
-            <dependency>
-                <groupId>com.baidu</groupId>
-                <artifactId>jprotobuf-rpc-core</artifactId>
-                <version>3.5.21</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>com.baidu</groupId>
-                        <artifactId>jprotobuf</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-
             <!-- https://mvnrepository.com/artifact/org.json/json -->
             <dependency>
                 <groupId>org.json</groupId>
@@ -386,11 +359,32 @@ under the License.
                 <version>2.1</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-netty</artifactId>
+                <version>${grpc.version}</version>
+                <scope>provided</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-protobuf</artifactId>
+                <version>${grpc.version}</version>
+                <scope>provided</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-stub</artifactId>
+                <version>${grpc.version}</version>
+                <scope>provided</scope>
+            </dependency>
+
             <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
             <dependency>
                 <groupId>com.google.protobuf</groupId>
                 <artifactId>protobuf-java</artifactId>
-                <version>3.5.1</version>
+                <version>${protobuf.version}</version>
             </dependency>
 
             <!-- https://mvnrepository.com/artifact/com.squareup/protoparser -->
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index d5da5f4..24560e0 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -112,6 +112,7 @@ message PTabletWriterCancelResult {
 };
 
 message PExecPlanFragmentRequest {
+    optional bytes request = 1;
 };
 
 message PExecPlanFragmentResult {
@@ -137,6 +138,7 @@ message PCancelPlanFragmentResult {
 
 message PFetchDataRequest {
     required PUniqueId finst_id = 1;
+    optional bool resp_in_attachment = 2;
 };
 
 message PFetchDataResult {
@@ -145,6 +147,8 @@ message PFetchDataResult {
     optional int64 packet_seq = 2;
     optional bool eos = 3;
     optional PQueryStatistics query_statistics = 4;
+    optional bytes row_batch = 5;
+    optional bool empty_batch = 6;
 };
 
 //Add message definition to fetch and update cache
@@ -190,6 +194,7 @@ message PFetchCacheRequest {
 message PFetchCacheResult {
     required PCacheStatus status = 1;
     repeated PCacheValue values = 2;
+    optional int64 all_count = 3 [default = 0];
 };
 
 enum PClearType {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org