You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/03/11 02:53:22 UTC
[incubator-doris] branch master updated: [internal] [doris-1084]
support compressed csv file in stream load (#5463)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8fbe5 [internal] [doris-1084] support compressed csv file in stream load (#5463)
7a8fbe5 is described below
commit 7a8fbe5db81011e33755a3238fb55f473e42ae80
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Thu Mar 11 10:53:05 2021 +0800
[internal] [doris-1084] support compressed csv file in stream load (#5463)
---
be/src/exec/broker_scanner.cpp | 6 ++---
be/src/http/action/stream_load.cpp | 54 +++++++++++++++++++++++++++++---------
be/src/http/http_common.h | 1 +
gensrc/thrift/PlanNodes.thrift | 2 +-
4 files changed, 46 insertions(+), 17 deletions(-)
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index f732244..e14f1fb 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -192,7 +192,7 @@ Status BrokerScanner::open_file_reader() {
}
Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
- if (_cur_decompressor == nullptr) {
+ if (_cur_decompressor != nullptr) {
delete _cur_decompressor;
_cur_decompressor = nullptr;
}
@@ -220,7 +220,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
break;
default: {
std::stringstream ss;
- ss << "Unknown format type, type=" << type;
+ ss << "Unknown format type, cannot inference compress type, type=" << type;
return Status::InternalError(ss.str());
}
}
@@ -271,7 +271,7 @@ Status BrokerScanner::open_line_reader() {
break;
default: {
std::stringstream ss;
- ss << "Unknown format type, type=" << range.format_type;
+ ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
return Status::InternalError(ss.str());
}
}
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index fbcbba5..164676c 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -69,18 +69,46 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit
TStreamLoadPutResult k_stream_load_put_result;
#endif
-static TFileFormatType::type parse_format(const std::string& format_str) {
+static TFileFormatType::type parse_format(const std::string& format_str,
+ const std::string& compress_type) {
+ if (format_str.empty()) {
+ return parse_format("CSV", compress_type);
+ }
+ TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN;
if (boost::iequals(format_str, "CSV")) {
- return TFileFormatType::FORMAT_CSV_PLAIN;
+ if (compress_type.empty()) {
+ format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+ }
+ if (boost::iequals(compress_type, "GZ")) {
+ format_type = TFileFormatType::FORMAT_CSV_GZ;
+ } else if (boost::iequals(compress_type, "LZO")) {
+ format_type = TFileFormatType::FORMAT_CSV_LZO;
+ } else if (boost::iequals(compress_type, "BZ2")) {
+ format_type = TFileFormatType::FORMAT_CSV_BZ2;
+ } else if (boost::iequals(compress_type, "LZ4FRAME")) {
+ format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
+ } else if (boost::iequals(compress_type, "LZOP")) {
+ format_type = TFileFormatType::FORMAT_CSV_LZOP;
+ } else if (boost::iequals(compress_type, "DEFLATE")) {
+ format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
+ }
} else if (boost::iequals(format_str, "JSON")) {
- return TFileFormatType::FORMAT_JSON;
+ if (compress_type.empty()) {
+ format_type = TFileFormatType::FORMAT_JSON;
+ }
}
- return TFileFormatType::FORMAT_UNKNOWN;
+ return format_type;
}
static bool is_format_support_streaming(TFileFormatType::type format) {
switch (format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_DEFLATE:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZO:
+ case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_JSON:
return true;
default:
@@ -214,15 +242,15 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
}
// get format of this put
- if (http_req->header(HTTP_FORMAT_KEY).empty()) {
- ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
- } else {
- ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY));
- if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
- std::stringstream ss;
- ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY);
- return Status::InternalError(ss.str());
- }
+ if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
+ return Status::InternalError("compress data of JSON format is not supported.");
+ }
+ ctx->format =
+ parse_format(http_req->header(HTTP_FORMAT_KEY), http_req->header(HTTP_COMPRESS_TYPE));
+ if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
+ std::stringstream ss;
+ ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY);
+ return Status::InternalError(ss.str());
}
// check content length
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index d9bf046..14f3e17 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -47,6 +47,7 @@ static const std::string HTTP_MERGE_TYPE = "merge_type";
static const std::string HTTP_DELETE_CONDITION = "delete";
static const std::string HTTP_FUNCTION_COLUMN = "function_column";
static const std::string HTTP_SEQUENCE_COL = "sequence_col";
+static const std::string HTTP_COMPRESS_TYPE = "compress_type";
static const std::string HTTP_100_CONTINUE = "100-continue";
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 96e1212..6dbdad2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -105,7 +105,7 @@ enum TFileFormatType {
FORMAT_PARQUET,
FORMAT_CSV_DEFLATE,
FORMAT_ORC,
- FORMAT_JSON
+ FORMAT_JSON,
}
// One broker range information.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org