You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/03/11 02:53:22 UTC

[incubator-doris] branch master updated: [internal] [doris-1084] support compressed csv file in stream load (#5463)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8fbe5  [internal] [doris-1084] support compressed csv file in stream load (#5463)
7a8fbe5 is described below

commit 7a8fbe5db81011e33755a3238fb55f473e42ae80
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Thu Mar 11 10:53:05 2021 +0800

    [internal] [doris-1084] support compressed csv file in stream load (#5463)
---
 be/src/exec/broker_scanner.cpp     |  6 ++---
 be/src/http/action/stream_load.cpp | 54 +++++++++++++++++++++++++++++---------
 be/src/http/http_common.h          |  1 +
 gensrc/thrift/PlanNodes.thrift     |  2 +-
 4 files changed, 46 insertions(+), 17 deletions(-)

diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index f732244..e14f1fb 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -192,7 +192,7 @@ Status BrokerScanner::open_file_reader() {
 }
 
 Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
-    if (_cur_decompressor == nullptr) {
+    if (_cur_decompressor != nullptr) {
         delete _cur_decompressor;
         _cur_decompressor = nullptr;
     }
@@ -220,7 +220,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
         break;
     default: {
         std::stringstream ss;
-        ss << "Unknown format type, type=" << type;
+        ss << "Unknown format type, cannot inference compress type, type=" << type;
         return Status::InternalError(ss.str());
     }
     }
@@ -271,7 +271,7 @@ Status BrokerScanner::open_line_reader() {
         break;
     default: {
         std::stringstream ss;
-        ss << "Unknown format type, type=" << range.format_type;
+        ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
         return Status::InternalError(ss.str());
     }
     }
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index fbcbba5..164676c 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -69,18 +69,46 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit
 TStreamLoadPutResult k_stream_load_put_result;
 #endif
 
-static TFileFormatType::type parse_format(const std::string& format_str) {
+static TFileFormatType::type parse_format(const std::string& format_str,
+                                          const std::string& compress_type) {
+    if (format_str.empty()) {
+        return parse_format("CSV", compress_type);
+    }
+    TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN;
     if (boost::iequals(format_str, "CSV")) {
-        return TFileFormatType::FORMAT_CSV_PLAIN;
+        if (compress_type.empty()) {
+            format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+        }
+        if (boost::iequals(compress_type, "GZ")) {
+            format_type = TFileFormatType::FORMAT_CSV_GZ;
+        } else if (boost::iequals(compress_type, "LZO")) {
+            format_type = TFileFormatType::FORMAT_CSV_LZO;
+        } else if (boost::iequals(compress_type, "BZ2")) {
+            format_type = TFileFormatType::FORMAT_CSV_BZ2;
+        } else if (boost::iequals(compress_type, "LZ4FRAME")) {
+            format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
+        } else if (boost::iequals(compress_type, "LZOP")) {
+            format_type = TFileFormatType::FORMAT_CSV_LZOP;
+        } else if (boost::iequals(compress_type, "DEFLATE")) {
+            format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
+        }
     } else if (boost::iequals(format_str, "JSON")) {
-        return TFileFormatType::FORMAT_JSON;
+        if (compress_type.empty()) {
+            format_type = TFileFormatType::FORMAT_JSON;
+        }
     }
-    return TFileFormatType::FORMAT_UNKNOWN;
+    return format_type;
 }
 
 static bool is_format_support_streaming(TFileFormatType::type format) {
     switch (format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
     case TFileFormatType::FORMAT_JSON:
         return true;
     default:
@@ -214,15 +242,15 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
     }
 
     // get format of this put
-    if (http_req->header(HTTP_FORMAT_KEY).empty()) {
-        ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
-    } else {
-        ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY));
-        if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
-            std::stringstream ss;
-            ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY);
-            return Status::InternalError(ss.str());
-        }
+    if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
+        return Status::InternalError("compress data of JSON format is not supported.");
+    }
+    ctx->format =
+            parse_format(http_req->header(HTTP_FORMAT_KEY), http_req->header(HTTP_COMPRESS_TYPE));
+    if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
+        std::stringstream ss;
+        ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY);
+        return Status::InternalError(ss.str());
     }
 
     // check content length
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index d9bf046..14f3e17 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -47,6 +47,7 @@ static const std::string HTTP_MERGE_TYPE = "merge_type";
 static const std::string HTTP_DELETE_CONDITION = "delete";
 static const std::string HTTP_FUNCTION_COLUMN = "function_column";
 static const std::string HTTP_SEQUENCE_COL = "sequence_col";
+static const std::string HTTP_COMPRESS_TYPE = "compress_type";
 
 static const std::string HTTP_100_CONTINUE = "100-continue";
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 96e1212..6dbdad2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -105,7 +105,7 @@ enum TFileFormatType {
     FORMAT_PARQUET,
     FORMAT_CSV_DEFLATE,
     FORMAT_ORC,
-    FORMAT_JSON
+    FORMAT_JSON,
 }
 
 // One broker range information.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org