You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@doris.apache.org by GitBox <gi...@apache.org> on 2018/11/10 07:46:46 UTC

[GitHub] imay closed pull request #296: Be compatible with old rpc

imay closed pull request #296: Be compatible with old rpc
URL: https://github.com/apache/incubator-doris/pull/296
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/exec/olap_table_sink.h b/be/src/exec/olap_table_sink.h
index 804ef9ca..44866d23 100644
--- a/be/src/exec/olap_table_sink.h
+++ b/be/src/exec/olap_table_sink.h
@@ -30,6 +30,7 @@
 #include "exec/olap_table_info.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/palo_internal_service.pb.h"
 #include "util/bitmap.h"
 #include "util/thrift_util.h"
 #include "util/ref_count_closure.h"
@@ -100,7 +101,7 @@ class NodeChannel {
     int64_t _next_packet_seq = 0;
 
     std::unique_ptr<RowBatch> _batch;
-    PInternalService_Stub* _stub = nullptr;
+    palo::PInternalService_Stub* _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
     RefCountClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
 
diff --git a/be/src/gen_cpp/CMakeLists.txt b/be/src/gen_cpp/CMakeLists.txt
index bbbeae72..57449caf 100644
--- a/be/src/gen_cpp/CMakeLists.txt
+++ b/be/src/gen_cpp/CMakeLists.txt
@@ -76,6 +76,7 @@ set(SRC_FILES
     ${GEN_CPP_DIR}/data.pb.cc
     ${GEN_CPP_DIR}/descriptors.pb.cc
     ${GEN_CPP_DIR}/internal_service.pb.cc
+    ${GEN_CPP_DIR}/palo_internal_service.pb.cc
     ${GEN_CPP_DIR}/types.pb.cc
     ${GEN_CPP_DIR}/status.pb.cc
     #$${GEN_CPP_DIR}/opcode/functions.cc
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index fea310e6..d0c60960 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -42,6 +42,7 @@
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/BackendService.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/palo_internal_service.pb.h"
 
 #include <arpa/inet.h>
 
@@ -159,7 +160,7 @@ class DataStreamSender::Channel {
     PUniqueId _finst_id;
     PRowBatch _pb_batch;
     PTransmitDataParams _brpc_request;
-    PInternalService_Stub* _brpc_stub = nullptr;
+    palo::PInternalService_Stub* _brpc_stub = nullptr;
     RefCountClosure<PTransmitDataResult>* _closure = nullptr;
     int32_t _brpc_timeout_ms = 500;
 };
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 1a9770dc..75e7350e 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -35,7 +35,10 @@ BRpcService::~BRpcService() {
 
 Status BRpcService::start(int port) {
     // Add service
-    _server->AddService(new PInternalServiceImpl(_exec_env), brpc::SERVER_OWNS_SERVICE);
+    _server->AddService(new PInternalServiceImpl<PBackendService>(_exec_env),
+                        brpc::SERVER_OWNS_SERVICE);
+    _server->AddService(new PInternalServiceImpl<palo::PInternalService>(_exec_env),
+                        brpc::SERVER_OWNS_SERVICE);
     // start service
     brpc::ServerOptions options;
     if (_server->Start(port, &options) != 0) {
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index d0e8b912..da388123 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -30,15 +30,18 @@
 
 namespace doris {
 
-PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
+template<typename T>
+PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env),
         _tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
 }
 
-PInternalServiceImpl::~PInternalServiceImpl() {
+template<typename T>
+PInternalServiceImpl<T>::~PInternalServiceImpl() {
 }
 
-void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_base,
+template<typename T>
+void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cntl_base,
                                          const PTransmitDataParams* request,
                                          PTransmitDataResult* response,
                                          google::protobuf::Closure* done) {
@@ -63,7 +66,8 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b
     }
 }
 
-void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* controller,
+template<typename T>
+void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController* controller,
                                               const PTabletWriterOpenRequest* request,
                                               PTabletWriterOpenResult* response,
                                               google::protobuf::Closure* done) {
@@ -80,7 +84,8 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
     st.to_protobuf(response->mutable_status());
 }
 
-void PInternalServiceImpl::exec_plan_fragment(
+template<typename T>
+void PInternalServiceImpl<T>::exec_plan_fragment(
         google::protobuf::RpcController* cntl_base,
         const PExecPlanFragmentRequest* request,
         PExecPlanFragmentResult* response,
@@ -94,7 +99,8 @@ void PInternalServiceImpl::exec_plan_fragment(
     st.to_protobuf(response->mutable_status());
 }
 
-void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* controller,
+template<typename T>
+void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController* controller,
                                                    const PTabletWriterAddBatchRequest* request,
                                                    PTabletWriterAddBatchResult* response,
                                                    google::protobuf::Closure* done) {
@@ -118,7 +124,8 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll
         });
 }
 
-void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller,
+template<typename T>
+void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcController* controller,
                                                 const PTabletWriterCancelRequest* request,
                                                 PTabletWriterCancelResult* response,
                                                 google::protobuf::Closure* done) {
@@ -134,7 +141,8 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
     }
 }
 
-Status PInternalServiceImpl::_exec_plan_fragment(brpc::Controller* cntl) {
+template<typename T>
+Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
     auto ser_request = cntl->request_attachment().to_string();
     TExecPlanFragmentParams t_request;
     {
@@ -147,7 +155,8 @@ Status PInternalServiceImpl::_exec_plan_fragment(brpc::Controller* cntl) {
     return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
 }
 
-void PInternalServiceImpl::cancel_plan_fragment(
+template<typename T>
+void PInternalServiceImpl<T>::cancel_plan_fragment(
         google::protobuf::RpcController* cntl_base,
         const PCancelPlanFragmentRequest* request,
         PCancelPlanFragmentResult* result,
@@ -164,7 +173,8 @@ void PInternalServiceImpl::cancel_plan_fragment(
     st.to_protobuf(result->mutable_status());
 }
 
-void PInternalServiceImpl::fetch_data(
+template<typename T>
+void PInternalServiceImpl<T>::fetch_data(
         google::protobuf::RpcController* cntl_base,
         const PFetchDataRequest* request,
         PFetchDataResult* result,
@@ -174,7 +184,8 @@ void PInternalServiceImpl::fetch_data(
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
 }
 
-void PInternalServiceImpl::fetch_fragment_exec_infos(
+template<typename T>
+void PInternalServiceImpl<T>::fetch_fragment_exec_infos(
         google::protobuf::RpcController* controller,
         const PFetchFragmentExecInfoRequest* request,
         PFetchFragmentExecInfosResult* result,
@@ -187,4 +198,7 @@ void PInternalServiceImpl::fetch_fragment_exec_infos(
     status.to_protobuf(result->mutable_status());
 }
 
+template class PInternalServiceImpl<PBackendService>;
+template class PInternalServiceImpl<palo::PInternalService>;
+
 }
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 21b7d205..5cc5cb6f 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -19,6 +19,7 @@
 
 #include "common/status.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/palo_internal_service.pb.h"
 #include "util/thread_pool.hpp"
 
 namespace brpc {
@@ -29,7 +30,8 @@ namespace doris {
 
 class ExecEnv;
 
-class PInternalServiceImpl : public PInternalService {
+template<typename T>
+class PInternalServiceImpl : public T {
 public:
     PInternalServiceImpl(ExecEnv* exec_env);
     virtual ~PInternalServiceImpl();
diff --git a/be/src/util/brpc_stub_cache.h b/be/src/util/brpc_stub_cache.h
index b82f94bf..7349692e 100644
--- a/be/src/util/brpc_stub_cache.h
+++ b/be/src/util/brpc_stub_cache.h
@@ -22,6 +22,7 @@
 
 #include "gen_cpp/Types_types.h" // TNetworkAddress
 #include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/palo_internal_service.pb.h"
 #include "service/brpc.h"
 #include "util/spinlock.h"
 
@@ -39,7 +40,7 @@ class BrpcStubCache {
         }
     }
 
-    PInternalService_Stub* get_stub(const butil::EndPoint& endpoint) {
+    palo::PInternalService_Stub* get_stub(const butil::EndPoint& endpoint) {
         std::lock_guard<SpinLock> l(_lock);
         auto stub_ptr = _stub_map.seek(endpoint);
         if (stub_ptr != nullptr) {
@@ -51,13 +52,13 @@ class BrpcStubCache {
         if (channel->Init(endpoint, &options)) {
             return nullptr;
         }
-        auto stub = new PInternalService_Stub(
+        auto stub = new palo::PInternalService_Stub(
             channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
         _stub_map.insert(endpoint, stub);
         return stub;
     }
 
-    PInternalService_Stub* get_stub(const TNetworkAddress& taddr) {
+    palo::PInternalService_Stub* get_stub(const TNetworkAddress& taddr) {
         butil::EndPoint endpoint;
         if (str2endpoint(taddr.hostname.c_str(), taddr.port, &endpoint)) {
             LOG(WARNING) << "unknown endpoint, hostname=" << taddr.hostname;
@@ -66,7 +67,7 @@ class BrpcStubCache {
         return get_stub(endpoint);
     }
 
-    PInternalService_Stub* get_stub(const std::string& host, int port) {
+    palo::PInternalService_Stub* get_stub(const std::string& host, int port) {
         butil::EndPoint endpoint;
         if (str2endpoint(host.c_str(), port, &endpoint)) {
             LOG(WARNING) << "unknown endpoint, hostname=" << host;
@@ -77,7 +78,7 @@ class BrpcStubCache {
 
 private:
     SpinLock _lock;
-    butil::FlatMap<butil::EndPoint, PInternalService_Stub*> _stub_map;
+    butil::FlatMap<butil::EndPoint, palo::PInternalService_Stub*> _stub_map;
 };
 
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index f81f5e1a..7b14afc8 100644
--- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -40,7 +40,7 @@
 
     private RpcClient rpcClient;
     // TODO(zc): use TNetworkAddress,
-    private Map<TNetworkAddress, PInternalService> serviceMap;
+    private Map<TNetworkAddress, PBackendService> serviceMap;
 
     private static BackendServiceProxy INSTANCE;
 
@@ -59,12 +59,12 @@ public static BackendServiceProxy getInstance() {
         return INSTANCE;
     }
 
-    private synchronized PInternalService getProxy(TNetworkAddress address) {
-        PInternalService service = serviceMap.get(address);
+    private synchronized PBackendService getProxy(TNetworkAddress address) {
+        PBackendService service = serviceMap.get(address);
         if (service != null) {
             return service;
         }
-        ProtobufRpcProxy<PInternalService> proxy = new ProtobufRpcProxy(rpcClient, PInternalService.class);
+        ProtobufRpcProxy<PBackendService> proxy = new ProtobufRpcProxy(rpcClient, PBackendService.class);
         proxy.setHost(address.getHostname());
         proxy.setPort(address.getPort());
         service = proxy.proxy();
@@ -78,7 +78,7 @@ private synchronized PInternalService getProxy(TNetworkAddress address) {
         final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest();
         pRequest.setRequest(tRequest);
         try {
-            final PInternalService service = getProxy(address);
+            final PBackendService service = getProxy(address);
             return service.execPlanFragmentAsync(pRequest);
         } catch (NoSuchElementException e) {
             try {
@@ -88,7 +88,7 @@ private synchronized PInternalService getProxy(TNetworkAddress address) {
                 } catch (InterruptedException interruptedException) {
                     // do nothing
                 }
-                final PInternalService service = getProxy(address);
+                final PBackendService service = getProxy(address);
                 return service.execPlanFragmentAsync(pRequest);
             } catch (NoSuchElementException noSuchElementException) {
                 LOG.warn("Execute plan fragment retry failed, address={}:{}",
@@ -106,7 +106,7 @@ private synchronized PInternalService getProxy(TNetworkAddress address) {
             TNetworkAddress address, TUniqueId finstId) throws RpcException {
         final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest(new PUniqueId(finstId));;
         try {
-            final PInternalService service = getProxy(address);
+            final PBackendService service = getProxy(address);
             return service.cancelPlanFragmentAsync(pRequest);
         } catch (NoSuchElementException e) {
             // retry
@@ -116,7 +116,7 @@ private synchronized PInternalService getProxy(TNetworkAddress address) {
                 } catch (InterruptedException interruptedException) {
                     // do nothing
                 }
-                final PInternalService service = getProxy(address);
+                final PBackendService service = getProxy(address);
                 return service.cancelPlanFragmentAsync(pRequest);
             } catch (NoSuchElementException noSuchElementException) {
                 LOG.warn("Cancel plan fragment retry failed, address={}:{}",
@@ -133,7 +133,7 @@ private synchronized PInternalService getProxy(TNetworkAddress address) {
     public Future<PFetchDataResult> fetchDataAsync(
             TNetworkAddress address, PFetchDataRequest request) throws RpcException {
         try {
-            PInternalService service = getProxy(address);
+            PBackendService service = getProxy(address);
             return service.fetchDataAsync(request);
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
@@ -146,7 +146,7 @@ private synchronized PInternalService getProxy(TNetworkAddress address) {
     public Future<PFetchFragmentExecInfosResult> fetchFragmentExecInfosAsync(
             TNetworkAddress address, PFetchFragmentExecInfoRequest request) throws RpcException {
         try {
-            final PInternalService service = getProxy(address);
+            final PBackendService service = getProxy(address);
             return service.fetchFragmentExecInfosAsync(request);
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
diff --git a/fe/src/main/java/org/apache/doris/rpc/PInternalService.java b/fe/src/main/java/org/apache/doris/rpc/PBackendService.java
similarity index 81%
rename from fe/src/main/java/org/apache/doris/rpc/PInternalService.java
rename to fe/src/main/java/org/apache/doris/rpc/PBackendService.java
index 1fed5162..5fe54901 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PInternalService.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PBackendService.java
@@ -21,21 +21,21 @@
 
 import java.util.concurrent.Future;
 
-public interface PInternalService {
-    @ProtobufRPC(serviceName = "PInternalService", methodName = "exec_plan_fragment",
+public interface PBackendService {
+    @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment",
             attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
     Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request);
 
-    @ProtobufRPC(serviceName = "PInternalService", methodName = "cancel_plan_fragment",
+    @ProtobufRPC(serviceName = "PBackendService", methodName = "cancel_plan_fragment",
             onceTalkTimeout = 5000)
     Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(PCancelPlanFragmentRequest request);
 
     // we set timeout to 1 day, because now there is no way to give different timeout for each RPC call
-    @ProtobufRPC(serviceName = "PInternalService", methodName = "fetch_data",
+    @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_data",
             attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000)
     Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request);
 
-    @ProtobufRPC(serviceName = "PInternalService", methodName = "fetch_fragment_exec_infos",
+    @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_fragment_exec_infos",
             attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
     Future<PFetchFragmentExecInfosResult> fetchFragmentExecInfosAsync(PFetchFragmentExecInfoRequest request);
 }
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 307081fb..aaeb4bd7 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -153,7 +153,7 @@ message PFetchFragmentExecInfosResult {
     repeated PFragmentExecInfo fragment_exec_info = 2;
 }
 
-service PInternalService {
+service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult);
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult);
@@ -162,5 +162,7 @@ service PInternalService {
     rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);
     rpc fetch_fragment_exec_infos(PFetchFragmentExecInfoRequest) returns (PFetchFragmentExecInfosResult);
+    // NOTE(zc): If you want to add new method here,
+    // you MUST add same method to palo_internal_service.proto
 };
 
diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto
new file mode 100644
index 00000000..5a91e82b
--- /dev/null
+++ b/gensrc/proto/palo_internal_service.proto
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// NOTE(XXX): DEPRECATED, just use to compatiple with old version.
+// Make system can grayscale upgrade
+syntax="proto2";
+
+import "internal_service.proto";
+
+package palo;
+
+option cc_generic_services = true;
+
+service PInternalService {
+    rpc transmit_data(doris.PTransmitDataParams) returns (doris.PTransmitDataResult);
+    rpc exec_plan_fragment(doris.PExecPlanFragmentRequest) returns (doris.PExecPlanFragmentResult);
+    rpc cancel_plan_fragment(doris.PCancelPlanFragmentRequest) returns (doris.PCancelPlanFragmentResult);
+    rpc fetch_data(doris.PFetchDataRequest) returns (doris.PFetchDataResult);
+    rpc tablet_writer_open(doris.PTabletWriterOpenRequest) returns (doris.PTabletWriterOpenResult);
+    rpc tablet_writer_add_batch(doris.PTabletWriterAddBatchRequest) returns (doris.PTabletWriterAddBatchResult);
+    rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult);
+    rpc fetch_fragment_exec_infos(doris.PFetchFragmentExecInfoRequest) returns (doris.PFetchFragmentExecInfosResult);
+};


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@doris.apache.org
For additional commands, e-mail: dev-help@doris.apache.org