You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/04/21 02:26:10 UTC
[incubator-doris] branch master updated: [Refactor] Remove
jprotobuf and use grpc client to connect brpc service (#5650)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b121ad6 [Refactor] Remove jprotobuf and use grpc client to connect brpc service (#5650)
b121ad6 is described below
commit b121ad6b953aaa29ad38491a435f09e049c40d3f
Author: Zhengguo Yang <78...@qq.com>
AuthorDate: Wed Apr 21 10:25:58 2021 +0800
[Refactor] Remove jprotobuf and use grpc client to connect brpc service (#5650)
---
be/src/runtime/buffer_control_block.cpp | 28 ++-
be/src/runtime/buffer_control_block.h | 10 +-
be/src/service/internal_service.cpp | 14 +-
be/src/service/internal_service.h | 2 +-
fe/fe-core/pom.xml | 102 ++++-----
.../main/java/org/apache/doris/common/Config.java | 14 +-
.../main/java/org/apache/doris/common/Status.java | 8 +-
.../common/proc/CurrentQueryInfoProvider.java | 38 ++--
.../org/apache/doris/common/util/DebugUtil.java | 6 +-
.../org/apache/doris/common/util/KafkaUtil.java | 50 ++---
.../org/apache/doris/planner/OlapScanNode.java | 8 +-
.../java/org/apache/doris/qe/ConnectProcessor.java | 14 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 105 +++++-----
.../java/org/apache/doris/qe/ResultReceiver.java | 44 ++--
.../main/java/org/apache/doris/qe/RowBatch.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 63 +++---
.../main/java/org/apache/doris/qe/cache/Cache.java | 3 +-
.../org/apache/doris/qe/cache/CacheAnalyzer.java | 19 +-
.../org/apache/doris/qe/cache/CacheBeProxy.java | 72 +++----
.../apache/doris/qe/cache/CacheCoordinator.java | 20 +-
.../java/org/apache/doris/qe/cache/CacheProxy.java | 228 +--------------------
.../org/apache/doris/qe/cache/PartitionCache.java | 50 +++--
.../org/apache/doris/qe/cache/RowBatchBuilder.java | 47 +++--
.../java/org/apache/doris/qe/cache/SqlCache.java | 35 +++-
.../org/apache/doris/rpc/BackendServiceClient.java | 84 ++++++++
.../org/apache/doris/rpc/BackendServiceProxy.java | 171 ++++++----------
.../java/org/apache/doris/rpc/PBackendService.java | 67 ------
.../apache/doris/rpc/PExecPlanFragmentRequest.java | 24 ---
.../org/apache/doris/rpc/PFetchDataRequest.java | 33 ---
.../doris/rpc/PTriggerProfileReportRequest.java | 42 ----
.../doris/rpc/ThriftClientAttachmentHandler.java | 33 ---
.../org/apache/doris/planner/QueryPlanTest.java | 5 +-
.../org/apache/doris/qe/ConnectProcessorTest.java | 9 +-
.../org/apache/doris/qe/PartitionCacheTest.java | 11 +-
.../org/apache/doris/utframe/MockedBackend.java | 26 +--
.../apache/doris/utframe/MockedBackendFactory.java | 140 +++++++------
fe/pom.xml | 54 +++--
gensrc/proto/internal_service.proto | 5 +
38 files changed, 683 insertions(+), 1003 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp
index 9d21999..3846fa5 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -45,16 +45,28 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics
}
void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos) {
- uint8_t* buf = nullptr;
- uint32_t len = 0;
- ThriftSerializer ser(false, 4096);
- auto st = ser.serialize(&t_result->result_batch, &len, &buf);
- if (st.ok()) {
- cntl->response_attachment().append(buf, len);
+ Status st = Status::OK();
+ if (t_result != nullptr) {
+ uint8_t* buf = nullptr;
+ uint32_t len = 0;
+ ThriftSerializer ser(false, 4096);
+ st = ser.serialize(&t_result->result_batch, &len, &buf);
+ if (st.ok()) {
+ if (resp_in_attachment) {
+ // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
+ cntl->response_attachment().append(buf, len);
+ } else {
+ result->set_row_batch(std::string((const char*)buf, len));
+ }
+ result->set_packet_seq(packet_seq);
+ result->set_eos(eos);
+ } else {
+ LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st.get_error_msg();
+ }
+ } else {
+ result->set_empty_batch(true);
result->set_packet_seq(packet_seq);
result->set_eos(eos);
- } else {
- LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(result->mutable_status());
done->Run();
diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h
index 517ae0b..f3e12e2 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -44,12 +44,18 @@ class PFetchDataResult;
struct GetResultBatchCtx {
brpc::Controller* cntl = nullptr;
+ // In version 0.15, we change brpc client from jprotobuf to grpc.
+ // And the response data is moved from rpc attachment to protobuf boby.
+ // This variables is for backwards compatibility when upgrading Doris.
+ // If set to true, the response data is still transferred as attachment.
+ // If set to false, the response data is transferred in protobuf body.
+ bool resp_in_attachment = true;
PFetchDataResult* result = nullptr;
google::protobuf::Closure* done = nullptr;
- GetResultBatchCtx(brpc::Controller* cntl_, PFetchDataResult* result_,
+ GetResultBatchCtx(brpc::Controller* cntl_, bool resp_in_attachment_, PFetchDataResult* result_,
google::protobuf::Closure* done_)
- : cntl(cntl_), result(result_), done(done_) {}
+ : cntl(cntl_), resp_in_attachment(resp_in_attachment_), result(result_), done(done_) {}
void on_failure(const Status& status);
void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 51480fb..a0c4bc9 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -77,7 +77,13 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- auto st = _exec_plan_fragment(cntl);
+ auto st = Status::OK();
+ if (request->has_request()) {
+ st = _exec_plan_fragment(request->request());
+ } else {
+ // TODO(yangzhengguo) this is just for compatible with old version, this should be removed in the release 0.15
+ st = _exec_plan_fragment(cntl->request_attachment().to_string());
+ }
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
@@ -129,8 +135,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
}
template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
- auto ser_request = cntl->request_attachment().to_string();
+Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_request) {
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
@@ -172,7 +177,8 @@ void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_b
const PFetchDataRequest* request, PFetchDataResult* result,
google::protobuf::Closure* done) {
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+ bool resp_in_attachment = request->has_resp_in_attachment() ? request->resp_in_attachment() : true;
+ GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, resp_in_attachment, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 1010f14..91196ae 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -89,7 +89,7 @@ public:
PCacheResponse* response, google::protobuf::Closure* done) override;
private:
- Status _exec_plan_fragment(brpc::Controller* cntl);
+ Status _exec_plan_fragment(const std::string& s_request);
private:
ExecEnv* _exec_env;
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 9a91ab2..0711f55 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -229,36 +229,11 @@ under the License.
<artifactId>joda-time</artifactId>
</dependency>
- <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf -->
- <dependency>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf</artifactId>
- <classifier>jar-with-dependencies</classifier>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-common -->
- <dependency>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf-rpc-common</artifactId>
- </dependency>
-
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
- <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-core -->
- <dependency>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf-rpc-core</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
@@ -607,6 +582,20 @@ under the License.
<artifactId>tree-printer</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -674,28 +663,46 @@ under the License.
</configuration>
</plugin>
- <!-- run make to generate Version and builtin -->
- <!-- also parse the proto for FE -->
+
+ <!-- protobuf -->
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>3.0.0</version>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>3.11.1</version>
<executions>
<execution>
- <id>make-dir</id>
<phase>generate-sources</phase>
<goals>
- <goal>exec</goal>
+ <goal>run</goal>
</goals>
<configuration>
- <executable>mkdir</executable>
- <arguments>
- <argument>-p</argument>
- <argument>${basedir}/target/generated-sources/proto</argument>
- </arguments>
- <skip>${skip.plugin}</skip>
+ <protocCommand>${doris.thirdparty}/installed/bin/protoc</protocCommand>
+ <!-->You can use following protocArtifact instead of protocCommand, so that you don't need to install protobuf tools<-->
+ <!--protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact-->
+ <protocVersion>${protobuf.version}</protocVersion>
+ <inputDirectories>
+ <include>${doris.home}/gensrc/proto</include>
+ </inputDirectories>
+ <outputTargets>
+ <outputTarget>
+ <type>java</type>
+ </outputTarget>
+ <outputTarget>
+ <type>grpc-java</type>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}</pluginArtifact>
+ </outputTarget>
+ </outputTargets>
</configuration>
</execution>
+ </executions>
+ </plugin>
+ <!-- run make to generate Version and builtin -->
+ <!-- also parse the proto for FE -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
<execution>
<id>gensrc</id>
<phase>generate-sources</phase>
@@ -711,24 +718,6 @@ under the License.
<skip>${skip.plugin}</skip>
</configuration>
</execution>
- <execution>
- <id>gen_proto</id>
- <phase>generate-sources</phase>
- <goals>
- <!-- DO NOT use goal 'java', it will terminate the VM after done -->
- <goal>exec</goal>
- </goals>
- <configuration>
- <executable>${java.home}/bin/java</executable>
- <arguments>
- <argument>-jar</argument>
- <argument>${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar</argument>
- <argument>--java_out=${basedir}/target/generated-sources/proto</argument>
- <argument>${doris.home}/gensrc/proto/internal_service.proto</argument>
- </arguments>
- <skip>${skip.plugin}</skip>
- </configuration>
- </execution>
</executions>
</plugin>
@@ -748,7 +737,6 @@ under the License.
<sources>
<!-- add arbitrary num of src dirs here -->
<source>${basedir}/target/generated-sources/build/</source>
- <source>${basedir}/target/generated-sources/proto/</source>
</sources>
</configuration>
</execution>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index f949eec..3a42551 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -900,13 +900,6 @@ public class Config extends ConfigBase {
// All frontends will get tablet stat from all backends at each interval
@ConfField public static int tablet_stat_update_interval_second = 300; // 5 min
- // May be necessary to modify the following BRPC configurations in high concurrency scenarios.
- // The number of concurrent requests BRPC can processed
- @ConfField public static int brpc_number_of_concurrent_requests_processed = 4096;
-
- // BRPC idle wait time (ms)
- @ConfField public static int brpc_idle_wait_max_time = 10000;
-
/**
* if set to false, auth check will be disable, in case some goes wrong with the new privilege system.
*/
@@ -1355,4 +1348,11 @@ public class Config extends ConfigBase {
*/
@ConfField
public static boolean enable_outfile_to_local = false;
+
+ /**
+ * Used to set the initial flow window size of the GRPC client channel, and also used to max message size.
+ * When the result set is large, you may need to increase this value.
+ */
+ @ConfField
+ public static int grpc_max_message_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 2fad775..7d6b7c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -17,7 +17,7 @@
package org.apache.doris.common;
-import org.apache.doris.proto.PStatus;
+import org.apache.doris.proto.Status.PStatus;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
@@ -81,9 +81,9 @@ public class Status {
}
public void setPstatus(PStatus status) {
- this.errorCode = TStatusCode.findByValue(status.status_code);
- if (status.error_msgs != null && !status.error_msgs.isEmpty()) {
- this.errorMsg = status.error_msgs.get(0);
+ this.errorCode = TStatusCode.findByValue(status.getStatusCode());
+ if (!status.getErrorMsgsList().isEmpty()) {
+ this.errorMsg = status.getErrorMsgs(0);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
index 46add4d..0dd2c8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -23,11 +23,10 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Counter;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.rpc.BackendServiceProxy;
-import org.apache.doris.rpc.PTriggerProfileReportRequest;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
@@ -203,9 +202,10 @@ public class CurrentQueryInfoProvider {
}
// specified query instance which will report.
if (!allQuery) {
- final PUniqueId pUId = new PUniqueId();
- pUId.hi = instanceInfo.getInstanceId().hi;
- pUId.lo = instanceInfo.getInstanceId().lo;
+ final Types.PUniqueId pUId = Types.PUniqueId.newBuilder()
+ .setHi(instanceInfo.getInstanceId().hi)
+ .setLo(instanceInfo.getInstanceId().lo)
+ .build();
request.addInstanceId(pUId);
}
}
@@ -213,13 +213,13 @@ public class CurrentQueryInfoProvider {
recvResponse(sendRequest(requests));
}
- private List<Pair<Request, Future<PTriggerProfileReportResult>>> sendRequest(
+ private List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> sendRequest(
Map<TNetworkAddress, Request> requests) throws AnalysisException {
- final List<Pair<Request, Future<PTriggerProfileReportResult>>> futures = Lists.newArrayList();
+ final List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures = Lists.newArrayList();
for (TNetworkAddress address : requests.keySet()) {
final Request request = requests.get(address);
- final PTriggerProfileReportRequest pbRequest =
- new PTriggerProfileReportRequest(request.getInstanceIds());
+ final InternalService.PTriggerProfileReportRequest pbRequest = InternalService.PTriggerProfileReportRequest
+ .newBuilder().addAllInstanceIds(request.getInstanceIds()).build();
try {
futures.add(Pair.create(request, BackendServiceProxy.getInstance().
triggerProfileReportAsync(address, pbRequest)));
@@ -230,18 +230,18 @@ public class CurrentQueryInfoProvider {
return futures;
}
- private void recvResponse(List<Pair<Request, Future<PTriggerProfileReportResult>>> futures)
+ private void recvResponse(List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures)
throws AnalysisException {
final String reasonPrefix = "Fail to receive result.";
- for (Pair<Request, Future<PTriggerProfileReportResult>> pair : futures) {
+ for (Pair<Request, Future<InternalService.PTriggerProfileReportResult>> pair : futures) {
try {
- final PTriggerProfileReportResult result
+ final InternalService.PTriggerProfileReportResult result
= pair.second.get(2, TimeUnit.SECONDS);
- final TStatusCode code = TStatusCode.findByValue(result.status.status_code);
+ final TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
String errMsg = "";
- if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) {
- errMsg = result.status.error_msgs.get(0);
+ if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+ errMsg = result.getStatus().getErrorMsgs(0);
}
throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress()
+ " reason:" + errMsg);
@@ -347,7 +347,7 @@ public class CurrentQueryInfoProvider {
private static class Request {
private final TNetworkAddress address;
- private final List<PUniqueId> instanceIds;
+ private final List<Types.PUniqueId> instanceIds;
public Request(TNetworkAddress address) {
this.address = address;
@@ -358,11 +358,11 @@ public class CurrentQueryInfoProvider {
return address;
}
- public List<PUniqueId> getInstanceIds() {
+ public List<Types.PUniqueId> getInstanceIds() {
return instanceIds;
}
- public void addInstanceId(PUniqueId instanceId) {
+ public void addInstanceId(Types.PUniqueId instanceId) {
this.instanceIds.add(instanceId);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
index 012a5ee..c8eca0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
@@ -18,7 +18,7 @@
package org.apache.doris.common.util;
import org.apache.doris.common.Pair;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TUniqueId;
import java.io.PrintWriter;
@@ -135,9 +135,9 @@ public class DebugUtil {
return builder.toString();
}
- public static String printId(final PUniqueId id) {
+ public static String printId(final Types.PUniqueId id) {
StringBuilder builder = new StringBuilder();
- builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
+ builder.append(Long.toHexString(id.getHi())).append("-").append(Long.toHexString(id.getLo()));
return builder.toString();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 6f75f8e..4ecb520 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -21,19 +21,13 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
-import org.apache.doris.proto.PKafkaLoadInfo;
-import org.apache.doris.proto.PKafkaMetaProxyRequest;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PStringPair;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
-import com.google.common.collect.Lists;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
@@ -61,31 +56,30 @@ public class KafkaUtil {
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
// create request
- PKafkaLoadInfo kafkaLoadInfo = new PKafkaLoadInfo();
- kafkaLoadInfo.brokers = brokerList;
- kafkaLoadInfo.topic = topic;
- for (Map.Entry<String, String> entry : convertedCustomProperties.entrySet()) {
- PStringPair pair = new PStringPair();
- pair.key = entry.getKey();
- pair.val = entry.getValue();
- if (kafkaLoadInfo.properties == null) {
- kafkaLoadInfo.properties = Lists.newArrayList();
- }
- kafkaLoadInfo.properties.add(pair);
- }
- PKafkaMetaProxyRequest kafkaRequest = new PKafkaMetaProxyRequest();
- kafkaRequest.kafka_info = kafkaLoadInfo;
- PProxyRequest request = new PProxyRequest();
- request.kafka_meta_request = kafkaRequest;
+ InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+ InternalService.PKafkaMetaProxyRequest.newBuilder()
+ .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+ .setBrokers(brokerList)
+ .setTopic(topic)
+ .addAllProperties(
+ convertedCustomProperties.entrySet().stream().map(
+ e -> InternalService.PStringPair.newBuilder()
+ .setKey(e.getKey())
+ .setVal(e.getValue())
+ .build()
+ ).collect(Collectors.toList())
+ )
+ )
+ ).build();
// get info
- Future<PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
- PProxyResult result = future.get(5, TimeUnit.SECONDS);
- TStatusCode code = TStatusCode.findByValue(result.status.status_code);
+ Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
+ InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
- throw new UserException("failed to get kafka partition info: " + result.status.error_msgs);
+ throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());
} else {
- return result.kafka_meta_result.partition_ids;
+ return result.getKafkaMetaResult().getPartitionIdsList();
}
} catch (Exception e) {
LOG.warn("failed to get partitions.", e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 4d260ae..3ec9866 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -242,9 +242,13 @@ public class OlapScanNode extends ScanNode {
this.selectedIndexId = selectedIndexId;
setIsPreAggregation(isPreAggregation, reasonOfDisable);
updateColumnType();
- LOG.info("Using the new scan range info instead of the old one. {}, {}", situation ,scanRangeInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using the new scan range info instead of the old one. {}, {}", situation ,scanRangeInfo);
+ }
} else {
- LOG.warn("Using the old scan range info instead of the new one. {}, {}", situation, scanRangeInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using the old scan range info instead of the new one. {}, {}", situation, scanRangeInfo);
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 2934923..8a30f1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -44,7 +44,7 @@ import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.plugin.AuditEvent.EventType;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
@@ -107,16 +107,16 @@ public class ConnectProcessor {
ctx.getState().setOk();
}
- private void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStatistics statistics) {
+ private void auditAfterExec(String origStmt, StatementBase parsedStmt, Data.PQueryStatistics statistics) {
// slow query
long endTime = System.currentTimeMillis();
long elapseMs = endTime - ctx.getStartTime();
ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setState(ctx.getState().toString()).setQueryTime(elapseMs)
- .setScanBytes(statistics == null ? 0 : statistics.scan_bytes)
- .setScanRows(statistics == null ? 0 : statistics.scan_rows)
- .setCpuTimeMs(statistics == null ? 0 : statistics.cpu_ms)
+ .setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
+ .setScanRows(statistics == null ? 0 : statistics.getScanRows())
+ .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
.setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
@@ -181,7 +181,7 @@ public class ConnectProcessor {
// execute this query.
StatementBase parsedStmt = null;
- List<Pair<StatementBase, PQueryStatistics>> auditInfoList = Lists.newArrayList();
+ List<Pair<StatementBase, Data.PQueryStatistics>> auditInfoList = Lists.newArrayList();
boolean alreadyAddedToAuditInfoList = false;
try {
List<StatementBase> stmts = analyze(originStmt);
@@ -232,7 +232,7 @@ public class ConnectProcessor {
// audit after exec
if (!auditInfoList.isEmpty()) {
- for (Pair<StatementBase, PQueryStatistics> audit : auditInfoList) {
+ for (Pair<StatementBase, Data.PQueryStatistics> audit : auditInfoList) {
auditAfterExec(originStmt.replace("\n", " "), audit.first, audit.second);
}
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7b8e880..c36e6b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -50,9 +50,7 @@ import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.UnionNode;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PPlanFragmentCancelReason;
-import org.apache.doris.proto.PStatus;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
@@ -80,11 +78,6 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TUniqueId;
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultiset;
@@ -94,6 +87,11 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -318,7 +316,7 @@ public class Coordinator {
for (PlanFragment fragment : fragments) {
fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment));
}
-
+
// set inputFragments
for (PlanFragment fragment : fragments) {
if (!(fragment.getSink() instanceof DataStreamSink)) {
@@ -335,7 +333,7 @@ public class Coordinator {
queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
fragmentProfile = new ArrayList<RuntimeProfile>();
- for (int i = 0; i < fragmentSize; i ++) {
+ for (int i = 0; i < fragmentSize; i++) {
fragmentProfile.add(new RuntimeProfile("Fragment " + i));
queryProfile.addChild(fragmentProfile.get(i));
}
@@ -469,7 +467,8 @@ public class Coordinator {
int instanceNum = params.instanceExecParams.size();
Preconditions.checkState(instanceNum > 0);
List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx);
- List<Pair<BackendExecState, Future<PExecPlanFragmentResult>>> futures = Lists.newArrayList();
+ List<Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>>> futures =
+ Lists.newArrayList();
// update memory limit for colocate join
if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
@@ -511,16 +510,16 @@ public class Coordinator {
backendIdx++;
}
- for (Pair<BackendExecState, Future<PExecPlanFragmentResult>> pair : futures) {
+ for (Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>> pair : futures) {
TStatusCode code;
String errMsg = null;
Exception exception = null;
try {
- PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
+ InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
TimeUnit.MILLISECONDS);
- code = TStatusCode.findByValue(result.status.status_code);
- if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) {
- errMsg = result.status.error_msgs.get(0);
+ code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+ errMsg = result.getStatus().getErrorMsgsList().get(0);
}
} catch (ExecutionException e) {
LOG.warn("catch a execute exception", e);
@@ -548,7 +547,7 @@ public class Coordinator {
LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}",
errMsg, code, fragment.getFragmentId(),
pair.first.address.hostname, pair.first.address.port);
- cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
+ cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
throw new UserException("query timeout. backend id: " + pair.first.backend.getId());
@@ -669,7 +668,7 @@ public class Coordinator {
queryStatus.setStatus(status);
LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}",
jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN");
- cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
+ cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
} finally {
lock.unlock();
}
@@ -720,13 +719,13 @@ public class Coordinator {
this.returnedAllResults = true;
// if this query is a block query do not cancel.
- Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
+ Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
LOG.debug("no block query, return num >= limit rows, need cancel");
- cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH);
+ cancelInternal(InternalService.PPlanFragmentCancelReason.LIMIT_REACH);
}
- } else {
+ } else if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}
@@ -746,13 +745,13 @@ public class Coordinator {
queryStatus.setStatus(Status.CANCELLED);
}
LOG.warn("cancel execution of query, this is outside invoke");
- cancelInternal(PPlanFragmentCancelReason.USER_CANCEL);
+ cancelInternal(InternalService.PPlanFragmentCancelReason.USER_CANCEL);
} finally {
unlock();
}
}
- private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
+ private void cancelInternal(InternalService.PPlanFragmentCancelReason cancelReason) {
if (null != receiver) {
receiver.cancel();
}
@@ -760,11 +759,11 @@ public class Coordinator {
if (profileDoneSignal != null) {
// count down to zero to notify all objects waiting for this
profileDoneSignal.countDownToZero(new Status());
- LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks().stream().map(e->DebugUtil.printId(e.getKey())).toArray());
+ LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks().stream().map(e -> DebugUtil.printId(e.getKey())).toArray());
}
}
- private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) {
+ private void cancelRemoteFragmentsAsync(InternalService.PPlanFragmentCancelReason cancelReason) {
for (BackendExecState backendExecState : backendExecStates) {
backendExecState.cancelFragmentInstance(cancelReason);
}
@@ -971,7 +970,7 @@ public class Coordinator {
throw new UserException("there is no scanNode Backend");
}
this.addressToBackendID.put(execHostport, backendIdRef.getRef());
- FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport,
+ FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport,
0, params);
params.instanceExecParams.add(instanceParam);
continue;
@@ -1018,7 +1017,7 @@ public class Coordinator {
// random select some instance
// get distinct host, when parallel_fragment_exec_instance_num > 1, single host may execute several instances
Set<TNetworkAddress> hostSet = Sets.newHashSet();
- for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
+ for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
hostSet.add(execParams.host);
}
List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
@@ -1028,7 +1027,7 @@ public class Coordinator {
params.instanceExecParams.add(instanceParam);
}
} else {
- for (FInstanceExecParam execParams: fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
+ for (FInstanceExecParam execParams : fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execParams.host, 0, params);
params.instanceExecParams.add(instanceParam);
}
@@ -1125,7 +1124,7 @@ public class Coordinator {
return false;
}
-
+
// Returns the id of the leftmost node of any of the gives types in 'plan_root',
// or INVALID_PLAN_NODE_ID if no such node present.
private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) {
@@ -1206,7 +1205,6 @@ public class Coordinator {
for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
-
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.bucketSeqSet.add(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.second.entrySet()) {
@@ -1269,14 +1267,14 @@ public class Coordinator {
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
- for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+ for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost);
}
- for(TScanRangeLocations location: locations) {
+ for (TScanRangeLocations location : locations) {
Map<Integer, List<TScanRangeParams>> scanRanges =
findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<Integer, List<TScanRangeParams>>());
@@ -1434,7 +1432,7 @@ public class Coordinator {
* return true otherwise.
* NOTICE: return true does not mean that coordinator executed success,
* the caller should check queryStatus for result.
- *
+ *
* We divide the entire waiting process into multiple rounds,
* with a maximum of 30 seconds per round. And after each round of waiting,
* check the status of the BE. If the BE status is abnormal, the wait is ended
@@ -1586,7 +1584,7 @@ public class Coordinator {
//buckendIdToBucketCountMap does not contain the new backend, insert into it
if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
- } else { //buckendIdToBucketCountMap contains the new backend, update it
+ } else { //buckendIdToBucketCountMap contains the new backend, update it
buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
}
} else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly
@@ -1608,14 +1606,14 @@ public class Coordinator {
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId());
- for (Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+ for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, idToBackend, addressToBackendID);
}
- for(TScanRangeLocations location: locations) {
+ for (TScanRangeLocations location : locations) {
Map<Integer, List<TScanRangeParams>> scanRanges =
findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<Integer, List<TScanRangeParams>>());
@@ -1689,7 +1687,6 @@ public class Coordinator {
}
}
-
private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
// cache the fragment id to its scan node ids. Used for colocate join.
@@ -1711,7 +1708,7 @@ public class Coordinator {
TNetworkAddress address;
Backend backend;
long lastMissingHeartbeatTime = -1;
-
+
public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId,
TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID) {
this.profileFragmentId = profileFragmentId;
@@ -1769,7 +1766,7 @@ public class Coordinator {
// cancel the fragment instance.
// return true if cancel success. Otherwise, return false
- public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason cancelReason) {
+ public synchronized boolean cancelFragmentInstance(InternalService.PPlanFragmentCancelReason cancelReason) {
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}, fragment instance id={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
@@ -1822,7 +1819,7 @@ public class Coordinator {
return true;
}
- public Future<PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
+ public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
TNetworkAddress brpcAddress = null;
try {
brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
@@ -1835,7 +1832,7 @@ public class Coordinator {
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
- return new Future<PExecPlanFragmentResult>() {
+ return new Future<InternalService.PExecPlanFragmentResult>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
@@ -1852,19 +1849,19 @@ public class Coordinator {
}
@Override
- public PExecPlanFragmentResult get() {
- PExecPlanFragmentResult result = new PExecPlanFragmentResult();
- PStatus pStatus = new PStatus();
- pStatus.error_msgs = Lists.newArrayList();
- pStatus.error_msgs.add(e.getMessage());
- // use THRIFT_RPC_ERROR so that this BE will be added to the blacklist later.
- pStatus.status_code = TStatusCode.THRIFT_RPC_ERROR.getValue();
- result.status = pStatus;
+ public InternalService.PExecPlanFragmentResult get() {
+ InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult
+ .newBuilder()
+ .setStatus(org.apache.doris.proto.Status.PStatus.newBuilder()
+ .addErrorMsgs(e.getMessage())
+ .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue())
+ .build())
+ .build();
return result;
}
@Override
- public PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
+ public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
return get();
}
};
@@ -1887,8 +1884,8 @@ public class Coordinator {
// used to assemble TPlanFragmentExecParas
protected class FragmentExecParams {
public PlanFragment fragment;
- public List<TPlanFragmentDestination> destinations = Lists.newArrayList();
- public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap();
+ public List<TPlanFragmentDestination> destinations = Lists.newArrayList();
+ public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap();
public List<PlanFragmentId> inputFragments = Lists.newArrayList();
public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList();
@@ -1960,8 +1957,8 @@ public class Coordinator {
TEsScanRange esScanRange = range.getScanRange().getEsScanRange();
if (esScanRange != null) {
sb.append("{ index=").append(esScanRange.getIndex())
- .append(", shardid=").append(esScanRange.getShardId())
- .append("}");
+ .append(", shardid=").append(esScanRange.getShardId())
+ .append("}");
}
}
sb.append("]");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 9166543..7978ebc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -18,10 +18,9 @@
package org.apache.doris.qe;
import org.apache.doris.common.Status;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
-import org.apache.doris.rpc.PFetchDataRequest;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
@@ -45,14 +44,12 @@ public class ResultReceiver {
private long packetIdx = 0;
private long timeoutTs = 0;
private TNetworkAddress address;
- private PUniqueId finstId;
+ private Types.PUniqueId finstId;
private Long backendId;
private Thread currentThread;
public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) {
- this.finstId = new PUniqueId();
- this.finstId.hi = tid.hi;
- this.finstId.lo = tid.lo;
+ this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
this.backendId = backendId;
this.address = address;
this.timeoutTs = System.currentTimeMillis() + timeoutMs;
@@ -65,11 +62,14 @@ public class ResultReceiver {
final RowBatch rowBatch = new RowBatch();
try {
while (!isDone && !isCancel) {
- PFetchDataRequest request = new PFetchDataRequest(finstId);
-
+ InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
+ .setFinstId(finstId)
+ .setRespInAttachment(false)
+ .build();
+
currentThread = Thread.currentThread();
- Future<PFetchDataResult> future = BackendServiceProxy.getInstance().fetchDataAsync(address, request);
- PFetchDataResult pResult = null;
+ Future<InternalService.PFetchDataResult> future = BackendServiceProxy.getInstance().fetchDataAsync(address, request);
+ InternalService.PFetchDataResult pResult = null;
while (pResult == null) {
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
@@ -86,30 +86,34 @@ public class ResultReceiver {
}
}
}
- TStatusCode code = TStatusCode.findByValue(pResult.status.status_code);
+ TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
- status.setPstatus(pResult.status);
+ status.setPstatus(pResult.getStatus());
return null;
}
- rowBatch.setQueryStatistics(pResult.query_statistics);
+ rowBatch.setQueryStatistics(pResult.getQueryStatistics());
- if (packetIdx != pResult.packet_seq) {
- LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.packet_seq);
+ if (packetIdx != pResult.getPacketSeq()) {
+ LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.getPacketSeq());
status.setRpcStatus("receive error packet");
return null;
}
packetIdx++;
- isDone = pResult.eos;
+ isDone = pResult.getEos();
- byte[] serialResult = request.getSerializedResult();
- if (serialResult != null && serialResult.length > 0) {
+ if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
+ LOG.info("get first empty rowbatch");
+ rowBatch.setEos(false);
+ return rowBatch;
+ } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
+ byte[] serialResult = pResult.getRowBatch().toByteArray();
TResultBatch resultBatch = new TResultBatch();
TDeserializer deserializer = new TDeserializer();
deserializer.deserialize(resultBatch, serialResult);
rowBatch.setBatch(resultBatch);
- rowBatch.setEos(pResult.eos);
+ rowBatch.setEos(pResult.getEos());
return rowBatch;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
index b6dfce9..087babb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/RowBatch.java
@@ -17,7 +17,7 @@
package org.apache.doris.qe;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data.PQueryStatistics;
import org.apache.doris.thrift.TResultBatch;
public final class RowBatch {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 4a361ec..0559773 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -70,31 +70,32 @@ import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.Planner;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
-import org.apache.doris.qe.cache.CacheBeProxy;
-import org.apache.doris.qe.cache.CacheProxy;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
import org.apache.doris.transaction.TransactionStatus;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.glassfish.jersey.internal.guava.Sets;
-
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.glassfish.jersey.internal.guava.Sets;
import java.io.IOException;
import java.io.StringReader;
@@ -127,7 +128,7 @@ public class StmtExecutor {
private Planner planner;
private boolean isProxy;
private ShowResultSet proxyResultSet = null;
- private PQueryStatistics statisticsForAuditLog;
+ private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
@@ -626,13 +627,21 @@ public class StmtExecutor {
// return true if the meta fields has been sent, otherwise, return false.
// the meta fields must be sent right before the first batch of data(or eos flag).
// so if it has data(or eos is true), this method must return true.
- private boolean sendCachedValues(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues,
+ private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCacheValue> cacheValues,
SelectStmt selectStmt, boolean isSendFields, boolean isEos)
throws Exception {
RowBatch batch = null;
boolean isSend = isSendFields;
- for (CacheBeProxy.CacheValue value : cacheValues) {
- batch = value.getRowBatch();
+ for (InternalService.PCacheValue value : cacheValues) {
+ TResultBatch resultBatch = new TResultBatch();
+ for (ByteString one : value.getRowsList()) {
+ resultBatch.addToRows(ByteBuffer.wrap(one.toByteArray()));
+ }
+ resultBatch.setPacketSeq(1);
+ resultBatch.setIsCompressed(false);
+ batch = new RowBatch();
+ batch.setBatch(resultBatch);
+ batch.setEos(true);
if (!isSend) {
// send meta fields before sending first data batch.
sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
@@ -646,7 +655,7 @@ public class StmtExecutor {
if (isEos) {
if (batch != null) {
- statisticsForAuditLog = batch.getQueryStatistics();
+ statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
}
if (!isSend) {
sendFields(selectStmt.getColLabels(), exprToType(selectStmt.getResultExprs()));
@@ -662,20 +671,20 @@ public class StmtExecutor {
*/
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception {
RowBatch batch = null;
- CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
+ InternalService.PFetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
CacheMode mode = cacheAnalyzer.getCacheMode();
SelectStmt newSelectStmt = selectStmt;
boolean isSendFields = false;
if (cacheResult != null) {
isCached = true;
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
- sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, true);
+ sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, true);
return;
}
// rewrite sql
if (mode == CacheMode.Partition) {
if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) {
- isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
+ isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
}
newSelectStmt = cacheAnalyzer.getRewriteStmt();
newSelectStmt.reset();
@@ -710,7 +719,7 @@ public class StmtExecutor {
}
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
- isSendFields = sendCachedValues(channel, cacheResult.getValueList(), newSelectStmt, isSendFields, false);
+ isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
}
cacheAnalyzer.updateCache();
@@ -720,7 +729,7 @@ public class StmtExecutor {
isSendFields = true;
}
- statisticsForAuditLog = batch.getQueryStatistics();
+ statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
context.getState().setEof();
return;
}
@@ -802,7 +811,7 @@ public class StmtExecutor {
}
}
- statisticsForAuditLog = batch.getQueryStatistics();
+ statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
context.getState().setEof();
plannerProfile.setQueryFetchResultFinishTime();
}
@@ -1110,20 +1119,20 @@ public class StmtExecutor {
context.getCatalog().getExportMgr().addExportJob(exportStmt);
}
- public PQueryStatistics getQueryStatisticsForAuditLog() {
+ public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
if (statisticsForAuditLog == null) {
- statisticsForAuditLog = new PQueryStatistics();
+ statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
}
- if (statisticsForAuditLog.scan_bytes == null) {
- statisticsForAuditLog.scan_bytes = 0L;
+ if (!statisticsForAuditLog.hasScanBytes()) {
+ statisticsForAuditLog.setScanBytes(0L);
}
- if (statisticsForAuditLog.scan_rows == null) {
- statisticsForAuditLog.scan_rows = 0L;
+ if (!statisticsForAuditLog.hasScanRows()) {
+ statisticsForAuditLog.setScanRows(0L);
}
- if (statisticsForAuditLog.cpu_ms == null) {
- statisticsForAuditLog.cpu_ms = 0L;
+ if (!statisticsForAuditLog.hasCpuMs()) {
+ statisticsForAuditLog.setCpuMs(0L);
}
- return statisticsForAuditLog;
+ return statisticsForAuditLog.build();
}
private List<PrimitiveType> exprToType(List<Expr> exprs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
index c2a054e..1711179 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/Cache.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe.cache;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
@@ -50,7 +51,7 @@ public abstract class Cache {
hitRange = HitRange.None;
}
- public abstract CacheProxy.FetchCacheResult getCacheData(Status status);
+ public abstract InternalService.PFetchCacheResult getCacheData(Status status);
public HitRange getHitRange() {
return hitRange;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index ddd2725..d8a6f47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -40,6 +40,7 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TUniqueId;
@@ -267,8 +268,7 @@ public class CacheAnalyzer {
return CacheMode.Partition;
}
- public CacheBeProxy.FetchCacheResult getCacheData() {
- CacheProxy.FetchCacheResult cacheResult = null;
+ public InternalService.PFetchCacheResult getCacheData() {
cacheMode = innerCheckCacheMode(0);
if (cacheMode == CacheMode.NoNeed) {
return null;
@@ -277,13 +277,18 @@ public class CacheAnalyzer {
return null;
}
Status status = new Status();
- cacheResult = cache.getCacheData(status);
-
- if (status.ok() && cacheResult != null) {
+ InternalService.PFetchCacheResult cacheResult = cache.getCacheData(status);
+ if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
+ int rowCount = 0;
+ int dataSize = 0;
+ for (InternalService.PCacheValue value : cacheResult.getValuesList()) {
+ rowCount += value.getRowsCount();
+ dataSize += value.getDataSize();
+ }
LOG.debug("hit cache, mode {}, queryid {}, all count {}, value count {}, row count {}, data size {}",
cacheMode, DebugUtil.printId(queryId),
- cacheResult.all_count, cacheResult.value_count,
- cacheResult.row_count, cacheResult.data_size);
+ cacheResult.getAllCount(), cacheResult.getValuesCount(),
+ rowCount, dataSize);
} else {
LOG.debug("miss cache, mode {}, queryid {}, code {}, msg {}", cacheMode,
DebugUtil.printId(queryId), status.getErrorCode(), status.getErrorMsg());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
index 064c86b..d7e5eda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
@@ -18,14 +18,8 @@
package org.apache.doris.qe.cache;
import org.apache.doris.common.Status;
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PCacheStatus;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PClearType;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PUniqueId;
-import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
@@ -48,8 +42,8 @@ import java.util.concurrent.TimeoutException;
public class CacheBeProxy extends CacheProxy {
private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
- public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status) {
- PUniqueId sqlKey = request.sql_key;
+ public void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status) {
+ Types.PUniqueId sqlKey = request.getSqlKey();
Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
if (backend == null) {
LOG.warn("update cache can't find backend, sqlKey {}", sqlKey);
@@ -57,13 +51,13 @@ public class CacheBeProxy extends CacheProxy {
}
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
- PUpdateCacheRequest updateRequest = request.getRpcRequest();
- Future<PCacheResponse> future = BackendServiceProxy.getInstance().updateCache(address, updateRequest);
- PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
- if (response.status == PCacheStatus.CACHE_OK) {
+ Future<InternalService.PCacheResponse> future = BackendServiceProxy.getInstance()
+ .updateCache(address, request);
+ InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
+ if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
} else {
- status.setStatus(response.status.toString());
+ status.setStatus(response.getStatus().toString());
}
} catch (Exception e) {
LOG.warn("update cache exception, sqlKey {}", sqlKey, e);
@@ -72,35 +66,18 @@ public class CacheBeProxy extends CacheProxy {
}
}
- public FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status) {
- PUniqueId sqlKey = request.sql_key;
+ public InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request,
+ int timeoutMs, Status status) {
+ Types.PUniqueId sqlKey = request.getSqlKey();
Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
if (backend == null) {
return null;
}
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
- long timeoutTs = System.currentTimeMillis() + timeoutMs;
- FetchCacheResult result = null;
try {
- PFetchCacheRequest fetchRequest = request.getRpcRequest();
- Future<PFetchCacheResult> future = BackendServiceProxy.getInstance().fetchCache(address, fetchRequest);
- PFetchCacheResult fetchResult = null;
- while (fetchResult == null) {
- long currentTs = System.currentTimeMillis();
- if (currentTs >= timeoutTs) {
- throw new TimeoutException("query cache timeout");
- }
- fetchResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
- if (fetchResult.status == PCacheStatus.CACHE_OK) {
- status = new Status(TStatusCode.OK, "");
- result = new FetchCacheResult();
- result.setResult(fetchResult);
- return result;
- } else {
- status.setStatus(fetchResult.status.toString());
- return null;
- }
- }
+ Future<InternalService.PFetchCacheResult> future = BackendServiceProxy.getInstance()
+ .fetchCache(address, request);
+ return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (RpcException e) {
LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
status.setRpcStatus(e.getMessage());
@@ -114,16 +91,15 @@ public class CacheBeProxy extends CacheProxy {
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
status.setStatus("query timeout");
- } finally {
}
- return result;
+ return null;
}
- public void clearCache(PClearCacheRequest request) {
+ public void clearCache(InternalService.PClearCacheRequest request) {
this.clearCache(request, CacheCoordinator.getInstance().getBackendList());
}
- public void clearCache(PClearCacheRequest request, List<Backend> beList) {
+ public void clearCache(InternalService.PClearCacheRequest request, List<Backend> beList) {
int retry;
Status status = new Status();
for (Backend backend : beList) {
@@ -143,18 +119,18 @@ public class CacheBeProxy extends CacheProxy {
}
}
- protected boolean clearCache(PClearCacheRequest request, Backend backend, int timeoutMs, Status status) {
+ protected boolean clearCache(InternalService.PClearCacheRequest request, Backend backend, int timeoutMs, Status status) {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
- request.clear_type = PClearType.CLEAR_ALL;
+ request = request.toBuilder().setClearType(InternalService.PClearType.CLEAR_ALL).build();
LOG.info("clear all backend cache, backendId {}", backend.getId());
- Future<PCacheResponse> future = BackendServiceProxy.getInstance().clearCache(address, request);
- PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
- if (response.status == PCacheStatus.CACHE_OK) {
+ Future<InternalService.PCacheResponse> future = BackendServiceProxy.getInstance().clearCache(address, request);
+ InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MICROSECONDS);
+ if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
return true;
} else {
- status.setStatus(response.status.toString());
+ status.setStatus(response.getStatus().toString());
return false;
}
} catch (Exception e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
index 97a7301..dbbb403 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java
@@ -18,7 +18,7 @@
package org.apache.doris.qe.cache;
import org.apache.doris.catalog.Catalog;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.Types;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.system.Backend;
@@ -68,13 +68,13 @@ public class CacheCoordinator {
* @param sqlKey 128 bit's sql md5
* @return Backend
*/
- public Backend findBackend(PUniqueId sqlKey) {
+ public Backend findBackend(Types.PUniqueId sqlKey) {
resetBackend();
Backend virtualNode = null;
try {
belock.lock();
- SortedMap<Long, Backend> headMap = virtualNodes.headMap(sqlKey.hi);
- SortedMap<Long, Backend> tailMap = virtualNodes.tailMap(sqlKey.hi);
+ SortedMap<Long, Backend> headMap = virtualNodes.headMap(sqlKey.getHi());
+ SortedMap<Long, Backend> tailMap = virtualNodes.tailMap(sqlKey.getHi());
int retryTimes = 0;
while (true) {
if (tailMap == null || tailMap.size() == 0) {
@@ -131,9 +131,9 @@ public class CacheCoordinator {
if (!idToBackend.containsKey(bid)) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String nodeName = String.valueOf(bid) + "::" + String.valueOf(i);
- PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
- virtualNodes.remove(nodeId.hi);
- LOG.debug("remove backend id {}, virtual node name {} hashcode {}", bid, nodeName, nodeId.hi);
+ Types.PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
+ virtualNodes.remove(nodeId.getHi());
+ LOG.debug("remove backend id {}, virtual node name {} hashcode {}", bid, nodeName, nodeId.getHi());
}
itr.remove();
}
@@ -147,9 +147,9 @@ public class CacheCoordinator {
realNodes.put(backend.getId(), backend);
for (int i = 0; i < VIRTUAL_NODES; i++) {
String nodeName = String.valueOf(backend.getId()) + "::" + String.valueOf(i);
- PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
- virtualNodes.put(nodeId.hi, backend);
- LOG.debug("add backend id {}, virtual node name {} hashcode {}", backend.getId(), nodeName, nodeId.hi);
+ Types.PUniqueId nodeId = CacheBeProxy.getMd5(nodeName);
+ virtualNodes.put(nodeId.getHi(), backend);
+ LOG.debug("add backend id {}, virtual node name {} hashcode {}", backend.getId(), nodeName, nodeId.getHi());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
index a750737..f9664d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheProxy.java
@@ -18,25 +18,14 @@
package org.apache.doris.qe.cache;
import org.apache.doris.common.Status;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.proto.PCacheParam;
-import org.apache.doris.proto.PCacheValue;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PUniqueId;
-import org.apache.doris.proto.PUpdateCacheRequest;
-import org.apache.doris.qe.RowBatch;
-import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
-import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.util.List;
/**
* It encapsulates the request and response parameters and methods,
@@ -48,203 +37,6 @@ public abstract class CacheProxy {
public static int UPDATE_TIMEOUT = 10000;
public static int CLEAR_TIMEOUT = 30000;
- public static class CacheParam extends PCacheParam {
- public CacheParam(PCacheParam param) {
- partition_key = param.partition_key;
- last_version = param.last_version;
- last_version_time = param.last_version_time;
- }
-
- public CacheParam(long partitionKey, long lastVersion, long lastVersionTime) {
- partition_key = partitionKey;
- last_version = lastVersion;
- last_version_time = lastVersionTime;
- }
-
- public PCacheParam getParam() {
- PCacheParam param = new PCacheParam();
- param.partition_key = partition_key;
- param.last_version = last_version;
- param.last_version_time = last_version_time;
- return param;
- }
-
- public void debug() {
- LOG.info("cache param, part key {}, version {}, time {}",
- partition_key, last_version, last_version_time);
- }
- }
-
- public static class CacheValue extends PCacheValue {
- public CacheParam param;
- public TResultBatch resultBatch;
-
- public CacheValue() {
- param = null;
- rows = Lists.newArrayList();
- data_size = 0;
- resultBatch = new TResultBatch();
- }
-
- public void addRpcResult(PCacheValue value) {
- param = new CacheParam(value.param);
- data_size += value.data_size;
- rows.addAll(value.rows);
- }
-
- public RowBatch getRowBatch() {
- for (byte[] one : rows) {
- resultBatch.addToRows(ByteBuffer.wrap(one));
- }
- RowBatch batch = new RowBatch();
- resultBatch.setPacketSeq(1);
- resultBatch.setIsCompressed(false);
- batch.setBatch(resultBatch);
- batch.setEos(true);
- return batch;
- }
-
- public void addUpdateResult(long partitionKey, long lastVersion, long lastVersionTime, List<byte[]> rowList) {
- param = new CacheParam(partitionKey, lastVersion, lastVersionTime);
- for (byte[] buf : rowList) {
- data_size += buf.length;
- rows.add(buf);
- }
- }
-
- public PCacheValue getRpcValue() {
- PCacheValue value = new PCacheValue();
- value.param = param.getParam();
- value.data_size = data_size;
- value.rows = rows;
- return value;
- }
-
- public void debug() {
- LOG.info("cache value, partkey {}, ver:{}, time {}, row_num {}, data_size {}",
- param.partition_key, param.last_version, param.last_version_time,
- rows.size(),
- data_size);
- for (int i = 0; i < rows.size(); i++) {
- LOG.info("{}:{}", i, rows.get(i));
- }
- }
- }
-
- public static class UpdateCacheRequest extends PUpdateCacheRequest {
- public int value_count;
- public int row_count;
- public int data_size;
- private List<CacheValue> valueList;
-
- public UpdateCacheRequest(String sqlStr) {
- this.sql_key = getMd5(sqlStr);
- this.valueList = Lists.newArrayList();
- value_count = 0;
- row_count = 0;
- data_size = 0;
- }
-
- public void addValue(long partitionKey, long lastVersion, long lastVersionTime, List<byte[]> rowList) {
- CacheValue value = new CacheValue();
- value.addUpdateResult(partitionKey, lastVersion, lastVersionTime, rowList);
- valueList.add(value);
- value_count++;
- }
-
- public PUpdateCacheRequest getRpcRequest() {
- value_count = valueList.size();
- PUpdateCacheRequest request = new PUpdateCacheRequest();
- request.values = Lists.newArrayList();
- request.sql_key = sql_key;
- for (CacheValue value : valueList) {
- request.values.add(value.getRpcValue());
- row_count += value.rows.size();
- data_size = value.data_size;
- }
- return request;
- }
-
- public void debug() {
- LOG.info("update cache request, sql_key {}, value_size {}", DebugUtil.printId(sql_key),
- valueList.size());
- for (CacheValue value : valueList) {
- value.debug();
- }
- }
- }
-
-
- public static class FetchCacheRequest extends PFetchCacheRequest {
- private List<CacheParam> paramList;
-
- public FetchCacheRequest(String sqlStr) {
- this.sql_key = getMd5(sqlStr);
- this.paramList = Lists.newArrayList();
- }
-
- public void addParam(long partitionKey, long lastVersion, long lastVersionTime) {
- CacheParam param = new CacheParam(partitionKey, lastVersion, lastVersionTime);
- paramList.add(param);
- }
-
- public PFetchCacheRequest getRpcRequest() {
- PFetchCacheRequest request = new PFetchCacheRequest();
- request.params = Lists.newArrayList();
- request.sql_key = sql_key;
- for (CacheParam param : paramList) {
- request.params.add(param.getParam());
- }
- return request;
- }
-
- public void debug() {
- LOG.info("fetch cache request, sql_key {}, param count {}", DebugUtil.printId(sql_key), paramList.size());
- for (CacheParam param : paramList) {
- param.debug();
- }
- }
- }
-
- public static class FetchCacheResult extends PFetchCacheResult {
- public int all_count;
- public int value_count;
- public int row_count;
- public int data_size;
- private List<CacheValue> valueList;
-
- public FetchCacheResult() {
- valueList = Lists.newArrayList();
- all_count = 0;
- value_count = 0;
- row_count = 0;
- data_size = 0;
- }
-
- public List<CacheValue> getValueList() {
- return valueList;
- }
-
- public void setResult(PFetchCacheResult rpcResult) {
- value_count = rpcResult.values.size();
- for (int i = 0; i < rpcResult.values.size(); i++) {
- PCacheValue rpcValue = rpcResult.values.get(i);
- CacheValue value = new CacheValue();
- value.addRpcResult(rpcValue);
- valueList.add(value);
- row_count += value.rows.size();
- data_size += value.data_size;
- }
- }
-
- public void debug() {
- LOG.info("fetch cache result, value size {}", valueList.size());
- for (CacheValue value : valueList) {
- value.debug();
- }
- }
- }
-
public enum CacheProxyType {
FE,
BE,
@@ -265,14 +57,15 @@ public abstract class CacheProxy {
return null;
}
- public abstract void updateCache(UpdateCacheRequest request, int timeoutMs, Status status);
+ public abstract void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status);
- public abstract FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Status status);
+ public abstract InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request,
+ int timeoutMs, Status status);
- public abstract void clearCache(PClearCacheRequest clearRequest);
+ public abstract void clearCache(InternalService.PClearCacheRequest clearRequest);
- public static PUniqueId getMd5(String str) {
+ public static Types.PUniqueId getMd5(String str) {
MessageDigest msgDigest;
try {
//128 bit
@@ -281,9 +74,10 @@ public abstract class CacheProxy {
return null;
}
final byte[] digest = msgDigest.digest(str.getBytes());
- PUniqueId key = new PUniqueId();
- key.lo = getLongFromByte(digest, 0);//64 bit
- key.hi = getLongFromByte(digest, 8);//64 bit
+ Types.PUniqueId key = Types.PUniqueId.newBuilder()
+ .setLo(getLongFromByte(digest, 0))
+ .setHi(getLongFromByte(digest, 8))
+ .build();
return key;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
index a820eb6..d208d77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TUniqueId;
@@ -38,6 +39,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.stream.Collectors;
public class PartitionCache extends Cache {
private static final Logger LOG = LogManager.getLogger(PartitionCache.class);
@@ -73,33 +75,33 @@ public class PartitionCache extends Cache {
this.newRangeList = Lists.newArrayList();
}
- public CacheProxy.FetchCacheResult getCacheData(Status status) {
- CacheProxy.FetchCacheRequest request;
+ public InternalService.PFetchCacheResult getCacheData(Status status) {
+
rewriteSelectStmt(null);
- request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql());
range = new PartitionRange(this.partitionPredicate, this.olapTable,
this.partitionInfo);
if (!range.analytics()) {
status.setStatus("analytics range error");
return null;
}
-
- for (PartitionRange.PartitionSingle single : range.getPartitionSingleList()) {
- request.addParam(single.getCacheKey().realValue(),
- single.getPartition().getVisibleVersion(),
- single.getPartition().getVisibleVersionTime()
- );
- }
-
- CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
- if (status.ok() && cacheResult != null) {
- cacheResult.all_count = range.getPartitionSingleList().size();
- for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) {
- range.setCacheFlag(value.param.partition_key);
+ InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder()
+ .setSqlKey(CacheProxy.getMd5(nokeyStmt.toSql()))
+ .addAllParams(range.getPartitionSingleList().stream().map(
+ p -> InternalService.PCacheParam.newBuilder()
+ .setPartitionKey(p.getCacheKey().realValue())
+ .setLastVersion(p.getPartition().getVisibleVersion())
+ .setLastVersionTime(p.getPartition().getVisibleVersionTime())
+ .build()).collect(Collectors.toList())
+ ).build();
+ InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
+ if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
+ for (InternalService.PCacheValue value : cacheResult.getValuesList()) {
+ range.setCacheFlag(value.getParam().getPartitionKey());
}
+ cacheResult = cacheResult.toBuilder().setAllCount(range.getPartitionSingleList().size()).build();
MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L);
MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) range.getPartitionSingleList().size());
- MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValueList().size());
+ MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) cacheResult.getValuesList().size());
}
range.setTooNewByID(latestTable.latestPartitionId);
@@ -125,15 +127,21 @@ public class PartitionCache extends Cache {
return;
}
- CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql());
- if (updateRequest.value_count > 0) {
+ InternalService.PUpdateCacheRequest updateRequest = rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql());
+ if (updateRequest.getValuesCount() > 0) {
CacheBeProxy proxy = new CacheBeProxy();
Status status = new Status();
proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
+ int rowCount = 0;
+ int dataSize = 0;
+ for (InternalService.PCacheValue value : updateRequest.getValuesList()) {
+ rowCount += value.getRowsCount();
+ dataSize += value.getDataSize();
+ }
LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId),
- DebugUtil.printId(updateRequest.sql_key),
- updateRequest.value_count, updateRequest.row_count, updateRequest.data_size);
+ DebugUtil.printId(updateRequest.getSqlKey()),
+ updateRequest.getValuesCount(), rowCount, dataSize);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
index e7f3a3a..444cef2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
@@ -17,11 +17,15 @@
package org.apache.doris.qe.cache;
-import com.google.common.collect.Lists;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.RowBatch;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -30,14 +34,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.stream.Collectors;
/**
- * According to the query partition range and cache hit, the rowbatch to update the cache is constructed
+ * According to the query partition range and cache hit, the rowbatch to update the cache is constructed
*/
public class RowBatchBuilder {
private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class);
- private CacheBeProxy.UpdateCacheRequest updateRequest;
+ private InternalService.PUpdateCacheRequest updateRequest;
private CacheAnalyzer.CacheMode cacheMode;
private int keyIndex;
private Type keyType;
@@ -63,8 +68,8 @@ public class RowBatchBuilder {
}
public void buildPartitionIndex(ArrayList<Expr> resultExpr,
- List<String> columnLabel, Column partColumn,
- List<PartitionRange.PartitionSingle> newSingleList) {
+ List<String> columnLabel, Column partColumn,
+ List<PartitionRange.PartitionSingle> newSingleList) {
if (cacheMode != CacheAnalyzer.CacheMode.Partition) {
return;
}
@@ -95,11 +100,19 @@ public class RowBatchBuilder {
}
}
- public CacheBeProxy.UpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) {
+ public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(String sql, long partitionKey, long lastVersion, long lastestTime) {
if (updateRequest == null) {
- updateRequest = new CacheBeProxy.UpdateCacheRequest(sql);
+ updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
}
- updateRequest.addValue(partitionKey, lastVersion, lastestTime, rowList);
+ updateRequest = updateRequest.toBuilder()
+ .addValues(InternalService.PCacheValue.newBuilder()
+ .setParam(InternalService.PCacheParam.newBuilder()
+ .setPartitionKey(partitionKey)
+ .setLastVersion(lastVersion)
+ .setLastVersionTime(lastestTime)
+ .build()).setDataSize(dataSize).addAllRows(
+ rowList.stream().map(row -> ByteString.copyFrom(row))
+ .collect(Collectors.toList()))).build();
return updateRequest;
}
@@ -115,7 +128,7 @@ public class RowBatchBuilder {
if (i == index) {
byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len);
String str = new String(content);
- key.init(type, str.toString());
+ key.init(type, str);
}
}
return key;
@@ -124,9 +137,9 @@ public class RowBatchBuilder {
/**
* Rowbatch split to Row
*/
- public CacheBeProxy.UpdateCacheRequest buildPartitionUpdateRequest(String sql) {
+ public InternalService.PUpdateCacheRequest buildPartitionUpdateRequest(String sql) {
if (updateRequest == null) {
- updateRequest = new CacheBeProxy.UpdateCacheRequest(sql);
+ updateRequest = InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
}
HashMap<Long, List<byte[]>> partRowMap = new HashMap<>();
List<byte[]> partitionRowList;
@@ -150,9 +163,17 @@ public class RowBatchBuilder {
Long key = entry.getKey();
PartitionRange.PartitionSingle partition = cachePartMap.get(key);
partitionRowList = entry.getValue();
- updateRequest.addValue(key, partition.getPartition().getVisibleVersion(),
- partition.getPartition().getVisibleVersionTime(), partitionRowList);
+ updateRequest = updateRequest.toBuilder()
+ .addValues(InternalService.PCacheValue.newBuilder()
+ .setParam(InternalService.PCacheParam.newBuilder()
+ .setPartitionKey(key)
+ .setLastVersion(partition.getPartition().getVisibleVersion())
+ .setLastVersionTime(partition.getPartition().getVisibleVersionTime())
+ .build()).addAllRows(
+ partitionRowList.stream().map(row -> ByteString.copyFrom(row))
+ .collect(Collectors.toList()))).build();
}
return updateRequest;
}
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
index dd67c6e..493ca68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
@@ -21,8 +21,10 @@ import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TUniqueId;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -37,13 +39,18 @@ public class SqlCache extends Cache {
this.latestTable = latestTable;
}
- public CacheProxy.FetchCacheResult getCacheData(Status status) {
- CacheProxy.FetchCacheRequest request = new CacheProxy.FetchCacheRequest(selectStmt.toSql());
- request.addParam(latestTable.latestPartitionId, latestTable.latestVersion,
- latestTable.latestTime);
- CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
- if (status.ok() && cacheResult != null) {
- cacheResult.all_count = 1;
+ public InternalService.PFetchCacheResult getCacheData(Status status) {
+ InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder()
+ .setSqlKey(CacheProxy.getMd5(selectStmt.toSql()))
+ .addParams(InternalService.PCacheParam.newBuilder()
+ .setPartitionKey(latestTable.latestPartitionId)
+ .setLastVersion(latestTable.latestVersion)
+ .setLastVersionTime(latestTable.latestTime))
+ .build();
+
+ InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
+ if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
+ cacheResult = cacheResult.toBuilder().setAllCount(1).build();
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
hitRange = HitRange.Full;
}
@@ -66,15 +73,21 @@ public class SqlCache extends Cache {
return;
}
- CacheBeProxy.UpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(),
+ InternalService.PUpdateCacheRequest updateRequest = rowBatchBuilder.buildSqlUpdateRequest(selectStmt.toSql(),
latestTable.latestPartitionId, latestTable.latestVersion, latestTable.latestTime);
- if (updateRequest.value_count > 0) {
+ if (updateRequest.getValuesCount() > 0) {
CacheBeProxy proxy = new CacheBeProxy();
Status status = new Status();
proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
+ int rowCount = 0;
+ int dataSize = 0;
+ for (InternalService.PCacheValue value : updateRequest.getValuesList()) {
+ rowCount += value.getRowsCount();
+ dataSize += value.getDataSize();
+ }
LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
- CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.sql_key),
- updateRequest.value_count, updateRequest.row_count, updateRequest.data_size);
+ CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId), DebugUtil.printId(updateRequest.getSqlKey()),
+ updateRequest.getValuesCount(), rowCount, dataSize);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
new file mode 100644
index 0000000..e098a0d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.PBackendServiceGrpc;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import java.util.concurrent.Future;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+
+public class BackendServiceClient {
+ private static final int MAX_RETRY_NUM = 3;
+ private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
+ private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
+ private final ManagedChannel channel;
+
+ public BackendServiceClient(TNetworkAddress address) {
+ channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
+ .flowControlWindow(Config.grpc_max_message_size_bytes)
+ .maxInboundMessageSize(Config.grpc_max_message_size_bytes)
+ .enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
+ .usePlaintext().build();
+ stub = PBackendServiceGrpc.newFutureStub(channel);
+ blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
+ }
+
+ public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
+ InternalService.PExecPlanFragmentRequest request) {
+ return stub.execPlanFragment(request);
+ }
+
+ public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
+ InternalService.PCancelPlanFragmentRequest request) {
+ return stub.cancelPlanFragment(request);
+ }
+
+ public Future<InternalService.PFetchDataResult> fetchDataAsync(InternalService.PFetchDataRequest request) {
+ return stub.fetchData(request);
+ }
+
+ public InternalService.PFetchDataResult fetchDataSync(InternalService.PFetchDataRequest request) {
+ return blockingStub.fetchData(request);
+ }
+
+ public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) {
+ return stub.updateCache(request);
+ }
+
+ public Future<InternalService.PFetchCacheResult> fetchCache(InternalService.PFetchCacheRequest request) {
+ return stub.fetchCache(request);
+ }
+
+ public Future<InternalService.PCacheResponse> clearCache(InternalService.PClearCacheRequest request) {
+ return stub.clearCache(request);
+ }
+
+ public Future<InternalService.PTriggerProfileReportResult> triggerProfileReport(
+ InternalService.PTriggerProfileReportRequest request) {
+ return stub.triggerProfileReport(request);
+ }
+
+ public Future<InternalService.PProxyResult> getInfo(InternalService.PProxyRequest request) {
+ return stub.getInfo(request);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 5d2c3af..1c4d785 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -17,22 +17,8 @@
package org.apache.doris.rpc;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.util.JdkUtils;
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PCancelPlanFragmentRequest;
-import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PPlanFragmentCancelReason;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.proto.PUniqueId;
-import org.apache.doris.proto.PUpdateCacheRequest;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
@@ -40,37 +26,20 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
-import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper;
-import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler;
-import com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy;
-import com.baidu.jprotobuf.pbrpc.transport.RpcClient;
-import com.baidu.jprotobuf.pbrpc.transport.RpcClientOptions;
import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.concurrent.Future;
public class BackendServiceProxy {
private static final Logger LOG = LogManager.getLogger(BackendServiceProxy.class);
-
- private RpcClient rpcClient;
- // TODO(zc): use TNetworkAddress,
- private Map<TNetworkAddress, PBackendService> serviceMap;
-
private static volatile BackendServiceProxy INSTANCE;
-
- static {
- int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version"));
- JDKCompilerHelper.setCompiler(new JdkCompiler(JdkCompiler.class.getClassLoader(), String.valueOf(javaRuntimeVersion)));
- }
+ private final Map<TNetworkAddress, BackendServiceClient> serviceMap;
public BackendServiceProxy() {
- final RpcClientOptions rpcOptions = new RpcClientOptions();
- rpcOptions.setMaxWait(Config.brpc_idle_wait_max_time);
- rpcOptions.setThreadPoolSize(Config.brpc_number_of_concurrent_requests_processed);
- rpcClient = new RpcClient(rpcOptions);
serviceMap = Maps.newHashMap();
}
@@ -85,42 +54,24 @@ public class BackendServiceProxy {
return INSTANCE;
}
- private synchronized PBackendService getProxy(TNetworkAddress address) {
- PBackendService service = serviceMap.get(address);
+ private synchronized BackendServiceClient getProxy(TNetworkAddress address) {
+ BackendServiceClient service = serviceMap.get(address);
if (service != null) {
return service;
}
- ProtobufRpcProxy<PBackendService> proxy = new ProtobufRpcProxy(rpcClient, PBackendService.class);
- proxy.setHost(address.getHostname());
- proxy.setPort(address.getPort());
- service = proxy.proxy();
+ service = new BackendServiceClient(address);
serviceMap.put(address, service);
return service;
}
- public Future<PExecPlanFragmentResult> execPlanFragmentAsync(
+ public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
TNetworkAddress address, TExecPlanFragmentParams tRequest)
throws TException, RpcException {
- final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest();
- pRequest.setRequest(tRequest);
+ final InternalService.PExecPlanFragmentRequest pRequest = InternalService.PExecPlanFragmentRequest.newBuilder()
+ .setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build();
try {
- final PBackendService service = getProxy(address);
- return service.execPlanFragmentAsync(pRequest);
- } catch (NoSuchElementException e) {
- try {
- // retry
- try {
- Thread.sleep(10);
- } catch (InterruptedException interruptedException) {
- // do nothing
- }
- final PBackendService service = getProxy(address);
- return service.execPlanFragmentAsync(pRequest);
- } catch (NoSuchElementException noSuchElementException) {
- LOG.warn("Execute plan fragment retry failed, address={}:{}",
- address.getHostname(), address.getPort(), noSuchElementException);
- throw new RpcException(address.hostname, e.getMessage());
- }
+ final BackendServiceClient client = getProxy(address);
+ return client.execPlanFragmentAsync(pRequest);
} catch (Throwable e) {
LOG.warn("Execute plan fragment catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -128,32 +79,17 @@ public class BackendServiceProxy {
}
}
- public Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(
- TNetworkAddress address, TUniqueId finstId, PPlanFragmentCancelReason cancelReason) throws RpcException {
- final PCancelPlanFragmentRequest pRequest = new PCancelPlanFragmentRequest();
- PUniqueId uid = new PUniqueId();
- uid.hi = finstId.hi;
- uid.lo = finstId.lo;
- pRequest.finst_id = uid;
- pRequest.cancel_reason = cancelReason;
+ public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
+ TNetworkAddress address, TUniqueId finstId, InternalService.PPlanFragmentCancelReason cancelReason)
+ throws RpcException {
+ final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
+ .newBuilder()
+ .setFinstId(
+ Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build())
+ .setCancelReason(cancelReason).build();
try {
- final PBackendService service = getProxy(address);
- return service.cancelPlanFragmentAsync(pRequest);
- } catch (NoSuchElementException e) {
- // retry
- try {
- try {
- Thread.sleep(10);
- } catch (InterruptedException interruptedException) {
- // do nothing
- }
- final PBackendService service = getProxy(address);
- return service.cancelPlanFragmentAsync(pRequest);
- } catch (NoSuchElementException noSuchElementException) {
- LOG.warn("Cancel plan fragment retry failed, address={}:{}",
- address.getHostname(), address.getPort(), noSuchElementException);
- throw new RpcException(address.hostname, e.getMessage());
- }
+ final BackendServiceClient client = getProxy(address);
+ return client.cancelPlanFragmentAsync(pRequest);
} catch (Throwable e) {
LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -161,11 +97,11 @@ public class BackendServiceProxy {
}
}
- public Future<PFetchDataResult> fetchDataAsync(
- TNetworkAddress address, PFetchDataRequest request) throws RpcException {
+ public Future<InternalService.PFetchDataResult> fetchDataAsync(
+ TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException {
try {
- PBackendService service = getProxy(address);
- return service.fetchDataAsync(request);
+ final BackendServiceClient client = getProxy(address);
+ return client.fetchDataAsync(request);
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -173,11 +109,23 @@ public class BackendServiceProxy {
}
}
- public Future<PCacheResponse> updateCache(
- TNetworkAddress address, PUpdateCacheRequest request) throws RpcException{
+ public InternalService.PFetchDataResult fetchDataSync(
+ TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException {
try {
- PBackendService service = getProxy(address);
- return service.updateCache(request);
+ final BackendServiceClient client = getProxy(address);
+ return client.fetchDataSync(request);
+ } catch (Throwable e) {
+ LOG.warn("fetch data catch a exception, address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
+ public Future<InternalService.PCacheResponse> updateCache(
+ TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.updateCache(request);
} catch (Throwable e) {
LOG.warn("update cache catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -185,11 +133,11 @@ public class BackendServiceProxy {
}
}
- public Future<PFetchCacheResult> fetchCache(
- TNetworkAddress address, PFetchCacheRequest request) throws RpcException {
+ public Future<InternalService.PFetchCacheResult> fetchCache(
+ TNetworkAddress address, InternalService.PFetchCacheRequest request) throws RpcException {
try {
- PBackendService service = getProxy(address);
- return service.fetchCache(request);
+ final BackendServiceClient client = getProxy(address);
+ return client.fetchCache(request);
} catch (Throwable e) {
LOG.warn("fetch cache catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -197,11 +145,11 @@ public class BackendServiceProxy {
}
}
- public Future<PCacheResponse> clearCache(
- TNetworkAddress address, PClearCacheRequest request) throws RpcException {
+ public Future<InternalService.PCacheResponse> clearCache(
+ TNetworkAddress address, InternalService.PClearCacheRequest request) throws RpcException {
try {
- PBackendService service = getProxy(address);
- return service.clearCache(request);
+ final BackendServiceClient client = getProxy(address);
+ return client.clearCache(request);
} catch (Throwable e) {
LOG.warn("clear cache catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -209,12 +157,11 @@ public class BackendServiceProxy {
}
}
-
- public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
- TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException {
+ public Future<InternalService.PTriggerProfileReportResult> triggerProfileReportAsync(
+ TNetworkAddress address, InternalService.PTriggerProfileReportRequest request) throws RpcException {
try {
- final PBackendService service = getProxy(address);
- return service.triggerProfileReport(request);
+ final BackendServiceClient client = getProxy(address);
+ return client.triggerProfileReport(request);
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
@@ -222,11 +169,11 @@ public class BackendServiceProxy {
}
}
- public Future<PProxyResult> getInfo(
- TNetworkAddress address, PProxyRequest request) throws RpcException {
+ public Future<InternalService.PProxyResult> getInfo(
+ TNetworkAddress address, InternalService.PProxyRequest request) throws RpcException {
try {
- final PBackendService service = getProxy(address);
- return service.getInfo(request);
+ final BackendServiceClient client = getProxy(address);
+ return client.getInfo(request);
} catch (Throwable e) {
LOG.warn("failed to get info, address={}:{}", address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
deleted file mode 100644
index 6e95fe6..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PBackendService.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import org.apache.doris.proto.PCacheResponse;
-import org.apache.doris.proto.PCancelPlanFragmentRequest;
-import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PClearCacheRequest;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PFetchCacheRequest;
-import org.apache.doris.proto.PFetchCacheResult;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.proto.PUpdateCacheRequest;
-
-import com.baidu.jprotobuf.pbrpc.ProtobufRPC;
-
-import java.util.concurrent.Future;
-
-public interface PBackendService {
- @ProtobufRPC(serviceName = "PBackendService", methodName = "exec_plan_fragment",
- attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
- Future<PExecPlanFragmentResult> execPlanFragmentAsync(PExecPlanFragmentRequest request);
-
- @ProtobufRPC(serviceName = "PBackendService", methodName = "cancel_plan_fragment",
- onceTalkTimeout = 5000)
- Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(PCancelPlanFragmentRequest request);
-
- // we set timeout to 1 day, because now there is no way to give different timeout for each RPC call
- @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_data",
- attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000)
- Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request);
-
- @ProtobufRPC(serviceName = "PBackendService", methodName = "update_cache", onceTalkTimeout = 10000)
- Future<PCacheResponse> updateCache(PUpdateCacheRequest request);
-
- @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_cache", onceTalkTimeout = 10000)
- Future<PFetchCacheResult> fetchCache(PFetchCacheRequest request);
-
- @ProtobufRPC(serviceName = "PBackendService", methodName = "clear_cache", onceTalkTimeout = 10000)
- Future<PCacheResponse> clearCache(PClearCacheRequest request);
-
- @ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report",
- attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000)
- Future<PTriggerProfileReportResult> triggerProfileReport(PTriggerProfileReportRequest request);
-
- @ProtobufRPC(serviceName = "PBackendService", methodName = "get_info", onceTalkTimeout = 10000)
- Future<PProxyResult> getInfo(PProxyRequest request);
-}
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java
deleted file mode 100644
index a183795..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PExecPlanFragmentRequest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-
-@ProtobufClass
-public class PExecPlanFragmentRequest extends AttachmentRequest {
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java
deleted file mode 100644
index 7cad9d4..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PFetchDataRequest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import org.apache.doris.proto.PUniqueId;
-
-import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-
-@ProtobufClass
-public class PFetchDataRequest extends AttachmentRequest {
- public PFetchDataRequest() {}
- public PFetchDataRequest(PUniqueId finstId) {
- this.finstId = finstId;
- }
- @Protobuf(order = 1, required = true)
- public PUniqueId finstId;
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
deleted file mode 100644
index 2bfce90..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import org.apache.doris.proto.PUniqueId;
-
-import com.baidu.bjf.remoting.protobuf.FieldType;
-import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-@ProtobufClass
-public class PTriggerProfileReportRequest extends AttachmentRequest {
-
- @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false)
- List<PUniqueId> instanceIds;
-
- public PTriggerProfileReportRequest() {
- }
-
- public PTriggerProfileReportRequest(List<PUniqueId> instanceIds) {
- this.instanceIds = Lists.newArrayList();
- this.instanceIds.addAll(instanceIds);
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java
deleted file mode 100644
index a6f05c9..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/ThriftClientAttachmentHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import com.baidu.jprotobuf.pbrpc.ClientAttachmentHandler;
-
-public class ThriftClientAttachmentHandler implements ClientAttachmentHandler {
- @Override
- public byte[] handleRequest(String serviceName, String methodName, Object... objects) {
- AttachmentRequest request = (AttachmentRequest) objects[0];
- return request.getSerializedRequest();
- }
- @Override
- public void handleResponse(byte[] bytes, String serviceName, String methodName, Object... objects) {
- AttachmentRequest result = (AttachmentRequest) objects[0];
- result.setSerializedResult(bytes);
- }
-}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index c6bd97c..e979c0c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1469,7 +1469,7 @@ public class QueryPlanTest {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql);
System.out.println("wangxixu-explain:"+explainString);
Assert.assertTrue(explainString.contains("PREDICATES: `query_time` < 1614614400, `query_time` >= 0"));
-
+
}
@Test
@@ -1493,6 +1493,3 @@ public class QueryPlanTest {
}
}
}
-
-
-
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
index 7421608..2a7fe57 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
@@ -28,7 +28,7 @@ import org.apache.doris.mysql.MysqlErrPacket;
import org.apache.doris.mysql.MysqlOkPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
-import org.apache.doris.proto.PQueryStatistics;
+import org.apache.doris.proto.Data;
import org.apache.doris.thrift.TUniqueId;
import org.junit.Assert;
@@ -55,7 +55,7 @@ public class ConnectProcessorTest {
@Mocked
private static SocketChannel socketChannel;
- private static PQueryStatistics statistics = new PQueryStatistics();
+ private static Data.PQueryStatistics statistics = Data.PQueryStatistics.newBuilder().build();
@BeforeClass
public static void setUpClass() {
@@ -97,10 +97,7 @@ public class ConnectProcessorTest {
serializer.writeEofString("");
fieldListPacket = serializer.toByteBuffer();
}
-
- statistics.scan_bytes = 0L;
- statistics.scan_rows = 0L;
- statistics.cpu_ms = 0L;
+ statistics = statistics.toBuilder().setCpuMs(0L).setScanRows(0).setScanBytes(0).build();
MetricRepo.init();
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
index 4f1525f..f2959f8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
@@ -52,7 +52,7 @@ import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
-import org.apache.doris.proto.PUniqueId;
+import org.apache.doris.proto.Types;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
@@ -433,15 +433,12 @@ public class PartitionCacheTest {
cp.addBackend(bd2);
cp.addBackend(bd3);
- PUniqueId key1 = new PUniqueId();
- key1.hi = 1L;
- key1.lo = 1L;
+ Types.PUniqueId key1 = Types.PUniqueId.newBuilder().setHi(1L).setLo(1L).build();
Backend bk = cp.findBackend(key1);
Assert.assertNotNull(bk);
Assert.assertEquals(bk.getId(),3);
-
- key1.hi = 669560558156283345L;
- key1.lo = 1L;
+
+ key1 = key1.toBuilder().setHi(669560558156283345L).build();
bk = cp.findBackend(key1);
Assert.assertNotNull(bk);
Assert.assertEquals(bk.getId(),1);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java
index 844828b..0924fdc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackend.java
@@ -18,15 +18,14 @@
package org.apache.doris.utframe;
import org.apache.doris.common.ThriftServer;
-import org.apache.doris.common.util.JdkUtils;
+import org.apache.doris.proto.PBackendServiceGrpc;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.utframe.MockedBackendFactory.BeThriftService;
-import com.baidu.bjf.remoting.protobuf.utils.JDKCompilerHelper;
-import com.baidu.bjf.remoting.protobuf.utils.compiler.JdkCompiler;
-import com.baidu.jprotobuf.pbrpc.transport.RpcServer;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
import org.apache.thrift.TProcessor;
@@ -48,7 +47,7 @@ public class MockedBackend {
private ThriftServer heartbeatServer;
private ThriftServer beThriftServer;
- private RpcServer rpcServer;
+ private Server backendServer;
private String host;
private int heartbeatPort;
@@ -59,15 +58,10 @@ public class MockedBackend {
// This must be set explicitly after creating mocked Backend
private TNetworkAddress feAddress;
- static {
- int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version"));
- JDKCompilerHelper.setCompiler(new JdkCompiler(JdkCompiler.class.getClassLoader(), String.valueOf(javaRuntimeVersion)));
- }
-
public MockedBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
HeartbeatService.Iface hbService,
- BeThriftService backendService,
- Object pBackendService) throws IOException {
+ BeThriftService backendService, PBackendServiceGrpc.PBackendServiceImplBase pBackendService)
+ throws IOException {
this.host = host;
this.heartbeatPort = heartbeatPort;
@@ -116,7 +110,7 @@ public class MockedBackend {
System.out.println("Be heartbeat service is started with port: " + heartbeatPort);
beThriftServer.start();
System.out.println("Be thrift service is started with port: " + thriftPort);
- rpcServer.start(brpcPort);
+ backendServer.start();
System.out.println("Be brpc service is started with port: " + brpcPort);
}
@@ -130,8 +124,8 @@ public class MockedBackend {
beThriftServer = new ThriftServer(beThriftPort, tprocessor);
}
- private void createBrpcService(int brpcPort, Object pBackendServiceImpl) {
- rpcServer = new RpcServer();
- rpcServer.registerService(pBackendServiceImpl);
+ private void createBrpcService(int brpcPort, PBackendServiceGrpc.PBackendServiceImplBase pBackendServiceImpl) {
+ backendServer = ServerBuilder.forPort(brpcPort)
+ .addService(pBackendServiceImpl).build();
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 06b6ecd..2dd7e80 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -18,18 +18,10 @@
package org.apache.doris.utframe;
import org.apache.doris.common.ClientPool;
-import org.apache.doris.proto.PCancelPlanFragmentRequest;
-import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PExecPlanFragmentResult;
-import org.apache.doris.proto.PFetchDataResult;
-import org.apache.doris.proto.PProxyRequest;
-import org.apache.doris.proto.PProxyResult;
-import org.apache.doris.proto.PQueryStatistics;
-import org.apache.doris.proto.PStatus;
-import org.apache.doris.proto.PTriggerProfileReportResult;
-import org.apache.doris.rpc.PExecPlanFragmentRequest;
-import org.apache.doris.rpc.PFetchDataRequest;
-import org.apache.doris.rpc.PTriggerProfileReportRequest;
+import org.apache.doris.proto.Data;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.PBackendServiceGrpc;
+import org.apache.doris.proto.Status;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.HeartbeatService;
@@ -70,9 +62,9 @@ import org.apache.doris.thrift.TTransmitDataParams;
import org.apache.doris.thrift.TTransmitDataResult;
import org.apache.doris.thrift.TUniqueId;
-import com.baidu.jprotobuf.pbrpc.ProtobufRPCService;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
+import io.grpc.stub.StreamObserver;
import org.apache.thrift.TException;
@@ -94,16 +86,10 @@ public class MockedBackendFactory {
public static final int BE_DEFAULT_BRPC_PORT = 8060;
public static final int BE_DEFAULT_HTTP_PORT = 8040;
- // create a default mocked backend with 3 default rpc services
- public static MockedBackend createDefaultBackend() throws IOException {
- return createBackend(BE_DEFAULT_IP, BE_DEFAULT_HEARTBEAT_PORT, BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_BRPC_PORT, BE_DEFAULT_HTTP_PORT,
- new DefaultHeartbeatServiceImpl(BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_HTTP_PORT, BE_DEFAULT_BRPC_PORT),
- new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
- }
-
// create a mocked backend with customize parameters
public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
- HeartbeatService.Iface hbService, BeThriftService beThriftService, Object pBackendService)
+ HeartbeatService.Iface hbService, BeThriftService beThriftService,
+ PBackendServiceGrpc.PBackendServiceImplBase pBackendService)
throws IOException {
MockedBackend backend = new MockedBackend(host, heartbeatPort, thriftPort, brpcPort, httpPort, hbService,
beThriftService, pBackendService);
@@ -284,54 +270,94 @@ public class MockedBackendFactory {
}
// The default Brpc service.
- // TODO(cmy): Currently this service cannot correctly simulate the processing of query requests.
- public static class DefaultPBackendServiceImpl {
- @ProtobufRPCService(serviceName = "PBackendService", methodName = "exec_plan_fragment")
- public PExecPlanFragmentResult exec_plan_fragment(PExecPlanFragmentRequest request) {
+ public static class DefaultPBackendServiceImpl extends PBackendServiceGrpc.PBackendServiceImplBase {
+ @Override
+ public void transmitData(InternalService.PTransmitDataParams request, StreamObserver<InternalService.PTransmitDataResult> responseObserver) {
+ responseObserver.onNext(InternalService.PTransmitDataResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, StreamObserver<InternalService.PExecPlanFragmentResult> responseObserver) {
System.out.println("get exec_plan_fragment request");
- PExecPlanFragmentResult result = new PExecPlanFragmentResult();
- PStatus pStatus = new PStatus();
- pStatus.status_code = 0;
- result.status = pStatus;
- return result;
+ responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+ responseObserver.onCompleted();
}
- @ProtobufRPCService(serviceName = "PBackendService", methodName = "cancel_plan_fragment")
- public PCancelPlanFragmentResult cancel_plan_fragment(PCancelPlanFragmentRequest request) {
+ @Override
+ public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, StreamObserver<InternalService.PCancelPlanFragmentResult> responseObserver) {
System.out.println("get cancel_plan_fragment request");
- PCancelPlanFragmentResult result = new PCancelPlanFragmentResult();
- PStatus pStatus = new PStatus();
- pStatus.status_code = 0;
- result.status = pStatus;
- return result;
+ responseObserver.onNext(InternalService.PCancelPlanFragmentResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+ responseObserver.onCompleted();
}
- @ProtobufRPCService(serviceName = "PBackendService", methodName = "fetch_data")
- public PFetchDataResult fetchDataAsync(PFetchDataRequest request) {
- System.out.println("get fetch_data");
- PFetchDataResult result = new PFetchDataResult();
- PStatus pStatus = new PStatus();
- pStatus.status_code = 0;
+ @Override
+ public void fetchData(InternalService.PFetchDataRequest request, StreamObserver<InternalService.PFetchDataResult> responseObserver) {
+ System.out.println("get fetch_data request");
+ responseObserver.onNext(InternalService.PFetchDataResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0))
+ .setQueryStatistics(Data.PQueryStatistics.newBuilder()
+ .setScanRows(0L)
+ .setScanBytes(0L))
+ .setEos(true)
+ .setPacketSeq(0L)
+ .build());
+ responseObserver.onCompleted();
+ }
- PQueryStatistics pQueryStatistics = new PQueryStatistics();
- pQueryStatistics.scan_rows = 0L;
- pQueryStatistics.scan_bytes = 0L;
+ @Override
+ public void tabletWriterOpen(InternalService.PTabletWriterOpenRequest request, StreamObserver<InternalService.PTabletWriterOpenResult> responseObserver) {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
- result.status = pStatus;
- result.packet_seq = 0L;
- result.query_statistics = pQueryStatistics;
- result.eos = true;
- return result;
+ @Override
+ public void tabletWriterAddBatch(InternalService.PTabletWriterAddBatchRequest request, StreamObserver<InternalService.PTabletWriterAddBatchResult> responseObserver) {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
}
- @ProtobufRPCService(serviceName = "PBackendService", methodName = "trigger_profile_report")
- public PTriggerProfileReportResult triggerProfileReport(PTriggerProfileReportRequest request) {
- return null;
+ @Override
+ public void tabletWriterCancel(InternalService.PTabletWriterCancelRequest request, StreamObserver<InternalService.PTabletWriterCancelResult> responseObserver) {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
}
- @ProtobufRPCService(serviceName = "PBackendService", methodName = "get_info")
- public PProxyResult getInfo(PProxyRequest request) {
- return null;
+ @Override
+ public void triggerProfileReport(InternalService.PTriggerProfileReportRequest request, StreamObserver<InternalService.PTriggerProfileReportResult> responseObserver) {
+ System.out.println("get triggerProfileReport request");
+ responseObserver.onNext(InternalService.PTriggerProfileReportResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getInfo(InternalService.PProxyRequest request, StreamObserver<InternalService.PProxyResult> responseObserver) {
+ System.out.println("get get_info request");
+ responseObserver.onNext(InternalService.PProxyResult.newBuilder()
+ .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void updateCache(InternalService.PUpdateCacheRequest request, StreamObserver<InternalService.PCacheResponse> responseObserver) {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void fetchCache(InternalService.PFetchCacheRequest request, StreamObserver<InternalService.PFetchCacheResult> responseObserver) {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void clearCache(InternalService.PClearCacheRequest request, StreamObserver<InternalService.PCacheResponse> responseObserver) {
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
}
}
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 24c949e..2a8f9ea 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -48,7 +48,8 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <jprotobuf.version>2.2.11</jprotobuf.version>
+ <grpc.version>1.30.0</grpc.version>
+ <protobuf.version>3.5.1</protobuf.version>
<skip.plugin>false</skip.plugin>
</properties>
@@ -273,40 +274,12 @@ under the License.
<version>2.10.1</version>
</dependency>
- <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf -->
- <dependency>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf</artifactId>
- <version>${jprotobuf.version}</version>
- <classifier>jar-with-dependencies</classifier>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-common -->
- <dependency>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf-rpc-common</artifactId>
- <version>1.8</version>
- </dependency>
-
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
- <!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-core -->
- <dependency>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf-rpc-core</artifactId>
- <version>3.5.21</version>
- <exclusions>
- <exclusion>
- <groupId>com.baidu</groupId>
- <artifactId>jprotobuf</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
@@ -386,11 +359,32 @@ under the License.
<version>2.1</version>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <version>3.5.1</version>
+ <version>${protobuf.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup/protoparser -->
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index d5da5f4..24560e0 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -112,6 +112,7 @@ message PTabletWriterCancelResult {
};
message PExecPlanFragmentRequest {
+ optional bytes request = 1;
};
message PExecPlanFragmentResult {
@@ -137,6 +138,7 @@ message PCancelPlanFragmentResult {
message PFetchDataRequest {
required PUniqueId finst_id = 1;
+ optional bool resp_in_attachment = 2;
};
message PFetchDataResult {
@@ -145,6 +147,8 @@ message PFetchDataResult {
optional int64 packet_seq = 2;
optional bool eos = 3;
optional PQueryStatistics query_statistics = 4;
+ optional bytes row_batch = 5;
+ optional bool empty_batch = 6;
};
//Add message definition to fetch and update cache
@@ -190,6 +194,7 @@ message PFetchCacheRequest {
message PFetchCacheResult {
required PCacheStatus status = 1;
repeated PCacheValue values = 2;
+ optional int64 all_count = 3 [default = 0];
};
enum PClearType {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org