You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wy...@apache.org on 2021/07/13 03:18:05 UTC
[incubator-doris] branch master updated: [Bug] Filter out
unavaliable backends when getting tablet location (#6204)
This is an automated email from the ASF dual-hosted git repository.
wyf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d79cdc8 [Bug] Filter out unavaliable backends when getting tablet location (#6204)
d79cdc8 is described below
commit d79cdc829ffcc742a9331fb4705fa93bce1dbd85
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Jul 13 11:17:49 2021 +0800
[Bug] Filter out unavaliable backends when getting tablet location (#6204)
* [Bug] Filter out unavaiable backends when getting scan range location
In the previous implementation, we will eliminate non-surviving BEs in the Coordinator phase.
But for Spark or Flink Connector, there is no such logic, so when a BE node is down,
it will cause the problem of querying errors through the Connector.
* fix ut
* fix compiule
---
be/src/http/action/stream_load.cpp | 10 ++++++++--
.../src/main/java/org/apache/doris/planner/OlapScanNode.java | 5 +++--
.../src/test/java/org/apache/doris/http/DorisHttpTestCase.java | 3 +++
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index f1c8e2e..ac409b1 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -268,15 +268,21 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
ctx->body_bytes = 0;
size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
+ bool read_json_by_line = false;
+ if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
+ if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
+ read_json_by_line = true;
+ }
+ }
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
- (ctx->body_bytes > json_max_body_bytes)) {
+ (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
std::stringstream ss;
ss << "The size of this batch exceed the max size [" << json_max_body_bytes
<< "] of json type data "
- << " data [ " << ctx->body_bytes << " ]";
+ << " data [ " << ctx->body_bytes << " ]. Split the file, or use 'read_json_by_line'";
return Status::InternalError(ss.str());
}
// csv max body size
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 143c175..62948c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -430,8 +430,9 @@ public class OlapScanNode extends ScanNode {
boolean collectedStat = false;
for (Replica replica : replicas) {
Backend backend = Catalog.getCurrentSystemInfo().getBackend(replica.getBackendId());
- if (backend == null) {
- LOG.debug("replica {} not exists", replica.getBackendId());
+ if (backend == null || !backend.isAlive()) {
+ LOG.debug("backend {} not exists or is not alive for replica {}",
+ replica.getBackendId(), replica.getId());
continue;
}
String ip = backend.getHost();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index 94fa7f0..14abfbc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -263,10 +263,13 @@ abstract public class DorisHttpTestCase {
private static void assignBackends() {
Backend backend1 = new Backend(testBackendId1, "node-1", 9308);
backend1.setBePort(9300);
+ backend1.setAlive(true);
Backend backend2 = new Backend(testBackendId2, "node-2", 9308);
backend2.setBePort(9300);
+ backend2.setAlive(true);
Backend backend3 = new Backend(testBackendId3, "node-3", 9308);
backend3.setBePort(9300);
+ backend3.setAlive(true);
Catalog.getCurrentSystemInfo().addBackend(backend1);
Catalog.getCurrentSystemInfo().addBackend(backend2);
Catalog.getCurrentSystemInfo().addBackend(backend3);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org